使用Java nio实现快速合并大量csv文件

需求描述

某项目有一个将小csv文件合并为大csv文件的场景:典型情况每台设备每天产生1440个csv文件(即每分钟一个文件),每个文件大小约100KB,需要将它们合并为一个csv文件。

这样的设备大约有2000台,也就是每天288万个csv文件,按设备合并后应得到2000个csv文件。

问题分析

如果用Apache Commons CSV依次读取小文件再写入大文件,每台设备大约需要20~30秒,2000台设备需要10小时以上,时间太长无法接受。

通过查看样例数据发现,每个设备每天的csv文件表头是完全相同的,跨设备或跨天则不保证表头相同。恰好合并的规则也是按设备按天,因此想到可以利用数据的这个特点优化合并效率。

Java的nio包提供了文件通道(FileChannel)访问文件的方法,允许在两个文件通道间直接传输数据(transferTo),省去了数据在硬件、内核态和用户态之间多次复制的开销(零拷贝)。同时FileChannel允许跳到文件指定位置进行读取,我们可以利用这一点跳过csv的表头区域。

代码实现

以下代码封装了合并csv的逻辑,经测试在普通服务器SAS硬盘下合并1440个文件耗时约1秒,对比之前的20秒提升还是很明显的。

package com.acme;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.channels.FileChannel;

public class MergeCsvDemo {

    final static Logger logger = LoggerFactory.getLogger(MergeCsvDemo.class.getName());

    /**
     * Merge given csv files into one file.
     * @param srcFiles Source csv files, must have same header
     * @param destFile Destination csv file
     * @throws UserException
     */
    public static void mergeCsvFiles(File[] srcFiles, File destFile) throws UserException {

        if (destFile.exists()) {
            throw new UserException("Destination file already exists: " + destFile.getPath());
        }
        if (!destFile.getPath().toUpperCase().contains("CSV")) {
            throw new UserException("Only csv file is allowed: " + destFile.getPath());
        }
        if (srcFiles.length == 0) {
            throw new UserException("Please specify at least one source file");
        }
        if (!destFile.getParentFile().exists()) {
            destFile.getParentFile().mkdirs();
        }

        try {

            // 获取源文件表头长度(假设每个源文件表头相同)
            int headerLength = 0;
            BufferedReader br = new BufferedReader(new FileReader(srcFiles[0]));
            String line = br.readLine();
            if (line == null) {
                throw new UserException("Empty source file: " + srcFiles[0]);
            }
            headerLength = line.length();
            br.close();

            // 合并文件
            FileChannel destChannel = new FileOutputStream(destFile, true).getChannel();
            for (int i = 0; i < srcFiles.length; i++) {
                FileChannel srcChannel = new FileInputStream(srcFiles[i]).getChannel();
                // 非第一个文件时,跳过表头
                if (i > 0) {
                    srcChannel.position(headerLength);
                }
                destChannel.transferFrom(srcChannel, destChannel.size(), srcChannel.size());
                srcChannel.close();
            }
            destChannel.close();

        } catch (IOException e) {
            e.printStackTrace();
            throw new UserException(e.getMessage());
        }

    }
}

示例下载

为了减小压缩后的文件尺寸,示例里的csv文件是经过脱敏、截取和复制处理的,执行测试用例即可合并80个样例csv文件,合并后约370MB,耗时约0.6秒。

merge-csv-demo.zip

Python处理csv速度慢问题一例和解决

问题现象

一段python代码处理一个60万行csv文件耗时过长,从内存增长图和作业日志上看,处理此csv文件用了十几分钟,但在本地python命令行里测试读取200万行的csv(列数相同)文件只需要不到30秒,因此猜测是在pandas处理此文件数据时存在未优化的代码。

file

分析和解决

经过排查,发现是在讲csv文件里的时间戳转换为datetime类型时,消耗了大量时间,更换timestamp转datetime的函数:

# input[key]['k_ts'] = input[key]['k_ts'].apply(lambda x:pd.to_datetime(x, utc=True, unit='ms').tz_convert('Asia/Shanghai'))

input[key]['k_ts'] = input[key]['k_ts'].apply(lambda x:datetime.datetime.fromtimestamp(x/1000))

同时反方向的datetime转换到timestamp函数也做相应更改:

# output[key][ts_col] = output[key][ts_col].apply(lambda x:x.timestamp() * 1000).astype('int64')

output[key][ts_col] = output[key][ts_col].apply(lambda x:int(time.mktime(x.timetuple())*1e3 + x.microsecond/1e3))

修改后问题解决。