Java读写本地Parquet格式数据文件

Apache Parquet是大数据平台里广泛使用的一种开源的列式文件存储格式,MapReduce和Spark等计算框架都内置了对读写Parquet文件的支持,通常Parquet文件放在HDFS上使用。

有时我们需要用Java直接读写本地的Parquet文件,在没有MapReduce或Spark等工具的情况下,要实现读写Parquet文件可以借助hadoop-clientparquet-hadoop这两个包实现。

一、依赖类库

首先需要在Java工程的pom.xml里添加下面的依赖项(引入hadoop-client会显著增大fat jar包的体积,但目前没有很好的替代方案):

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.7</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-hadoop</artifactId>
    <version>1.10.1</version>
</dependency>

二、将数据写入Parquet文件

Parquet官方提供了org.apache.parquet.example.data.Group作为一条记录的对象,这里演示以此对象写入parquet文件的方法。为了简化示例代码,parquet文件里每一列的类型都使用整型。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.*;

import java.io.IOException;

/**
 * 示例程序:数据以 org.apache.parquet.example.data.Group 的形式写入Parquet文件
 */
public class WriteParquetDemoGroup {
    Configuration conf;

    public WriteParquetDemoGroup() {
        conf = new Configuration();
        conf.set("fs.hdfs.impl",
                org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
        );
        conf.set("fs.file.impl",
                org.apache.hadoop.fs.LocalFileSystem.class.getName()
        );
    }

    public void writeParquet(int numRows, String[] fields, Path parquetPath) throws IOException {

        Types.MessageTypeBuilder schemaBuilder = Types.buildMessage();
        for (int j = 0; j < fields.length; j++) {
            schemaBuilder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, fields[j]));
        }
        MessageType schema = schemaBuilder.named("record");

        GroupWriteSupport.setSchema(schema, conf);
        GroupWriteSupport writeSupport = new GroupWriteSupport();
        writeSupport.init(conf);

        ParquetWriter<Group> writer = null;
        try {
            writer = new ParquetWriter<Group>(parquetPath,
                    writeSupport,
                    CompressionCodecName.SNAPPY,
                    ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,
                    ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
                    ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
                    ParquetWriter.DEFAULT_WRITER_VERSION,
                    conf);

            for (int i = 0; i < numRows; i++) {
                Group group = new SimpleGroupFactory(schema).newGroup();
                for (int j = 0; j < fields.length; j++) {
                    group.add(fields[j], i * j);
                }
                writer.write(group);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (writer != null) {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

三、从Parquet文件读取数据

读取Parquet文件的代码本身很简单,只是要特别注意一点,为了能发挥Parquet列式存储的优势,应将要读取的列配置到PARQUET_READ_SCHEMA参数,以便跳过其他不需要扫描的列,从而提高读取性能。

public void readParquetWithReadSchema(Path parquetPath, String[] queryFields) throws IOException {
    // 将要读取的列配置到PARQUET_READ_SCHEMA,如果缺失这一步读取性能将严重降低
    Types.MessageTypeBuilder builder = Types.buildMessage();
    for (int j = 0; j < queryFields.length; j++) {
        builder.addField(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, queryFields[j]));
    }
    MessageType messageType = builder.named("record");
    conf.set(ReadSupport.PARQUET_READ_SCHEMA, messageType.toString());

    // 读取Parquet文件
    GroupReadSupport readSupport = new GroupReadSupport();
    ParquetReader.Builder<Group> readerBuilder = ParquetReader.builder(readSupport, parquetPath);
    ParquetReader<Group> reader = readerBuilder.withConf(conf).build();
    Group line = null;
    while ((line = reader.read()) != null) {
        for (String field : queryFields) {
            line.getInteger(field, 0);
        }
    }
}

四、性能测试

写了一个简单的测试用例ParquetDemoTest,验证不同条件下上面的代码写入和读取Parquet文件的耗时。运行环境是普通笔记本电脑,i5 CPU + SSD硬盘。

写入性能:固定每行500列,可以看到写入parquet文件的耗时与写入的行数成正比;

读取性能:从parquet文件中读取少量列时速度是很快的,读取耗时与读取的列数成正比;

错误读取:当没有配置PARQUET_READ_SCHEMA时,读取多少列耗时都与读取500列差不多,未能体现列式存储的优势。

写入Parquet文件(Group), 100 行 x 500 列, 耗时 1580 ms
写入Parquet文件(Group), 500 行 x 500 列, 耗时 6927 ms
写入Parquet文件(Group), 1000 行 x 500 列, 耗时 12424 ms
写入Parquet文件(Group), 2000 行 x 500 列, 耗时 25849 ms
写入Parquet文件(Group), 3000 行 x 500 列, 耗时 36799 ms

读取Parquet文件(过滤列), 3000 行 x 5 列, 耗时 180 ms
读取Parquet文件(过滤列), 3000 行 x 10 列, 耗时 202 ms
读取Parquet文件(过滤列), 3000 行 x 15 列, 耗时 171 ms
读取Parquet文件(过滤列), 3000 行 x 50 列, 耗时 258 ms
读取Parquet文件(过滤列), 3000 行 x 100 列, 耗时 504 ms
读取Parquet文件(过滤列), 3000 行 x 200 列, 耗时 1608 ms
读取Parquet文件(过滤列), 3000 行 x 300 列, 耗时 2544 ms
读取Parquet文件(过滤列), 3000 行 x 400 列, 耗时 3998 ms
读取Parquet文件(过滤列), 3000 行 x 500 列, 耗时 6022 ms

读取Parquet文件(未过滤列), 3000 行 x 5 列, 耗时 6188 ms
读取Parquet文件(未过滤列), 3000 行 x 5 列, 耗时 6795 ms
读取Parquet文件(未过滤列), 3000 行 x 10 列, 耗时 6717 ms
读取Parquet文件(未过滤列), 3000 行 x 15 列, 耗时 6268 ms
读取Parquet文件(未过滤列), 3000 行 x 50 列, 耗时 6311 ms
读取Parquet文件(未过滤列), 3000 行 x 100 列, 耗时 7317 ms
读取Parquet文件(未过滤列), 3000 行 x 200 列, 耗时 6637 ms
读取Parquet文件(未过滤列), 3000 行 x 300 列, 耗时 6676 ms
读取Parquet文件(未过滤列), 3000 行 x 400 列, 耗时 7225 ms
读取Parquet文件(未过滤列), 3000 行 x 500 列, 耗时 6928 ms

五、代码下载

文中使用的代码:parquet-demo-1.0.0-src.zip

参考资料

https://www.arm64.ca/post/reading-parquet-files-java/

定制Flink输出的parquet文件名

问题描述

使用Flink将kafka等数据源的数据流,经过处理后输出到文件,我们一般是这样写代码的:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), ParquetAvroWriters.forGenericRecord(avroSchema))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

实际使用时发现,生成的parquet文件名称是"part-1-2"这种格式的,且没有扩展名。由于我们的应用对parquet文件名有一定的规范且文件名里包含一些实用信息(例如文件里的记录条数),所以这样是不能满足我们要求的。

然而flink里这个文件名的规则是写死在Bucket.java里的无法修改,只能寻找变通的方法来解决。

解决方法

StreamingFileSink.forBulkFormat()的第二个参数是一个Factory,用于创建BulkWriter,我们可以从这里入手,注入自定义的BulkWriter,在写入文件的时候修改parquet文件名。

以下是若干个相关类,经实测通过。这个方案最大的问题是需要通过反射获取targetFile文件名,所以有可能在未来的flink版本里失效。

StreamingFileSink:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), new MyParquetWriterFactory(new MyParquetBuilder()))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

MyParquetWriterFactory:

static class MyParquetWriterFactory extends ParquetWriterFactory {
    public MyParquetWriterFactory(ParquetBuilder writerBuilder) {
        super(writerBuilder);
    }

    @Override
    public BulkWriter create(FSDataOutputStream stream) throws IOException {
        BulkWriter writer = super.create(stream);
        return new MyParquetBulkWriter(writer, stream);
    }
}

MyParquetBulkWriter:

/**
 * 包装flink的ParquetBulkWriter,修改part文件名格式
 */
static class MyParquetBulkWriter implements BulkWriter<GenericRecord> {
    private FSDataOutputStream stream;
    private BulkWriter writer;
    private int rowCount;

    public MyParquetBulkWriter(BulkWriter writer, FSDataOutputStream stream) {
        this.writer = writer;
        this.stream = stream;
    }

    @Override
    public void addElement(GenericRecord element) throws IOException {
        writer.addElement(element);
        rowCount++; //记录计数,结果将作为文件名的一部分
    }

    @Override
    public void flush() throws IOException {
        writer.flush();
    }

    @Override
    public void finish() throws IOException {
        // 试图在finish()后改名失败,因为finish()后正式文件并没有生成
        // 通过反射直接修改stream里的targetFile名称可行
        // 这里是修改part文件名的关键部分
        try {
            Field field = stream.getClass().getDeclaredField("targetFile");
            field.setAccessible(true);
            File targetFile = (File) field.get(stream);
            File renamedTargetFile = new File(targetFile.getParent(), rowCount + "_" + System.currentTimeMillis() + ".parquet");
            field.set(stream, renamedTargetFile);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } finally {
            writer.finish();
        }
    }
}

MyParquetBuilder(其中avroSchema是在外部赋值的):

static class MyParquetBuilder implements ParquetBuilder {
    @Override
    public ParquetWriter createWriter(OutputFile outputFile) throws IOException {
        return 
        ParquetWriter.builder(outputFile).withSchema(avroSchema).withDataModel(GenericData.get()).build();
    }
}

参考资料

Flink streaming - Change part file names when using StreamingFileSink?

大数据系统的若干瓶颈

一、Zookeeper里watch的上限

虽然zk本身没有对watch数量设置上限,但在实际场景里,由于watch数量过多导致系统资源被耗尽的情况偶有发生。

以一个实际场景为例,这个场景里有30000个设备,在zk里每个设备对应一个znode,然后有storm的topology对每个znode加上3个watch(通过curator,一个spout两个bolt),这个topology的并发是200。

计算一下总的watch数量就是30000x200x3=1800万个,按照ZOOKEEPER-1177的描述,平均每个watch占用100字节左右内存,1800万个watch大约占用1.8GB内存。这时必须相应调高zk能够使用的内存数量。

在命令行使用 “echo wchs | nc zkaddress port” 命令可以查看当前zk里watch数量,相当于先telnet到zk再发送zk提供的四字命令(完整命令列表):

zookeeper_watches

二、Impala分区数量的上限

根据Cloudera的建议(见:Impala maximum number of partitions),一个impala表最多使用10万个分区(partition),最好不超过1万个分区。

在实际场景里,假设我们想按“设备ID”和“天”对设备数据进行分类,那么当有20000台设备时,每年所需要的分区数量是20000x365=7300000个,已经大大超出了impala的限制,这时就要考虑调整分区粒度,比如从时间维度调整为每个月分一个区,从设备维度调整为将若干个设备分为一组再以组为单位分区。但无论如何,这些调整通常对业务应用是有代价,需要衡量是否能够接受。

要统计一个表有多少个分区,可以使用explain语句:

impala_partitions

要查看详细分区信息,使用show partitions mytable语句:

impala_show_partitions

三、Impala文件数量限制

(注:经过核实,此问题在通过JDBC且SYNC_DDL时出现,impala-shell里REFRESH通常不会超时)

直接修改HDFS上的文件后,需要使用Impala的REFRESH命令更新Impala元数据。当文件数量过多(例如200万个),REFRESH命令会超时。

解决方法:按Partition依次执行REFRESH命令,只要每个Partition的文件数量不多,就可以实现更新整个表的元数据。

REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])]

四、HDFS文件数量限制

HDFS最著名的限制是namenode单点失效问题。

根据cloudera博客文章The Small Files Problem的解释,每个文件、每个目录以及每个块(block)会在namenode节点占用150字节内存,假设有一千万个文件,每个文件一个块,则总共占用20000000*150=3GB物理内存。

仍然以设备数据为例,假设我们想把每个设备的数据按天保存到hdfs文件,那么20000个设备每年产生730万个文件,三年2200万个文件,是硬件资源可以承受的数量级。但如果业务要求对设备的不同类别数据分文件存放,例如设备的高频数据与低频数据,则文件总数量还要乘以数据类型的个数,这时就必须考虑namenode物理内存是否够用。

Hortonworks根据文件数量推荐的namenode内存配置表如下(来源),可以看出1000万文件建议配置5.4GB,比前面估计的值(3GB)稍高,这应该是考虑到namenode服务器额外开销和一定冗余度的数值:

hortonworks_namenode_heapsize

使用“hadoop fs -count /”命令可以统计当前hdfs上已有文件数量,不过这个命令无法看到块的数量。

hadoop_file_count2

五、Parquet文件列数限制

Parquet是基于列的存储格式,最大优势是从文件中抽取小部分列时效率很高,同时Impala、Hive和Spark等大数据查询/分析引擎都支持它,所以不少大数据系统底层都是用parquet做数据存储。

由于parquet不支持对已有文件的修改,因此在设计系统时就要考虑文件里包含哪些列,就像为数据库设计表结构一样。值得关注的问题是,一个parquet文件里能包含的列数目是有限的,至于上限值是多少与多个因素有关,很难给出一个确切的数字。我查到的一些建议是尽量不要超过1000个列。

例如在PARQUET-222的讨论里,一个例子是假设一个parquet文件有2.6万个整数类型的列,因为每个writer至少需要64KB x 4的内存,写入这样一个parquet文件至少需要6.34GB内存,很容易超出jvm限制。

(三个月前遇到这个问题,当时没有及时记录,现在找资料又花了好几个小时,这次赶紧记下)

参考 PARQUET-222 和 PARQUET-394