需求描述
某项目有一个将小csv文件合并为大csv文件的场景:典型情况每台设备每天产生1440个csv文件(即每分钟一个文件),每个文件大小约100KB,需要将它们合并为一个csv文件。
这样的设备大约有2000台,也就是每天288万个csv文件,按设备合并后应得到2000个csv文件。
问题分析
如果用Apache Commons CSV依次读取小文件再写入大文件,每台设备大约需要20~30秒,2000台设备需要10小时以上,时间太长无法接受。
通过查看样例数据发现,每个设备每天的csv文件表头是完全相同的,跨设备或跨天则不保证表头相同。恰好合并的规则也是按设备按天,因此想到可以利用数据的这个特点优化合并效率。
Java的nio包提供了文件通道(FileChannel)访问文件的方法,允许在两个文件通道间直接传输数据(transferTo),省去了数据在硬件、内核态和用户态之间多次复制的开销(零拷贝)。同时FileChannel允许跳到文件指定位置进行读取,我们可以利用这一点跳过csv的表头区域。
代码实现
以下代码封装了合并csv的逻辑,经测试在普通服务器SAS硬盘下合并1440个文件耗时约1秒,对比之前的20秒提升还是很明显的。
package com.acme;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.channels.FileChannel;
public class MergeCsvDemo {
final static Logger logger = LoggerFactory.getLogger(MergeCsvDemo.class.getName());
/**
* Merge given csv files into one file.
* @param srcFiles Source csv files, must have same header
* @param destFile Destination csv file
* @throws UserException
*/
public static void mergeCsvFiles(File[] srcFiles, File destFile) throws UserException {
if (destFile.exists()) {
throw new UserException("Destination file already exists: " + destFile.getPath());
}
if (!destFile.getPath().toUpperCase().contains("CSV")) {
throw new UserException("Only csv file is allowed: " + destFile.getPath());
}
if (srcFiles.length == 0) {
throw new UserException("Please specify at least one source file");
}
if (!destFile.getParentFile().exists()) {
destFile.getParentFile().mkdirs();
}
try {
// 获取源文件表头长度(假设每个源文件表头相同)
int headerLength = 0;
BufferedReader br = new BufferedReader(new FileReader(srcFiles[0]));
String line = br.readLine();
if (line == null) {
throw new UserException("Empty source file: " + srcFiles[0]);
}
headerLength = line.length();
br.close();
// 合并文件
FileChannel destChannel = new FileOutputStream(destFile, true).getChannel();
for (int i = 0; i < srcFiles.length; i++) {
FileChannel srcChannel = new FileInputStream(srcFiles[i]).getChannel();
// 非第一个文件时,跳过表头
if (i > 0) {
srcChannel.position(headerLength);
}
destChannel.transferFrom(srcChannel, destChannel.size(), srcChannel.size());
srcChannel.close();
}
destChannel.close();
} catch (IOException e) {
e.printStackTrace();
throw new UserException(e.getMessage());
}
}
}
示例下载
为了减小压缩后的文件尺寸,示例里的csv文件是经过脱敏、截取和复制处理的,执行测试用例即可合并80个样例csv文件,合并后约370MB,耗时约0.6秒。