问题描述
使用Flink将kafka等数据源的数据流,经过处理后输出到文件,我们一般是这样写代码的:
Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
.forBulkFormat(new Path("my/base/path"), ParquetAvroWriters.forGenericRecord(avroSchema))
.withBucketAssigner(new MyBucketAssigner())
.build();
实际使用时发现,生成的parquet文件名称是"part-1-2"这种格式的,且没有扩展名。由于我们的应用对parquet文件名有一定的规范且文件名里包含一些实用信息(例如文件里的记录条数),所以这样是不能满足我们要求的。
然而flink里这个文件名的规则是写死在Bucket.java里的无法修改,只能寻找变通的方法来解决。
解决方法
StreamingFileSink.forBulkFormat()
的第二个参数是一个Factory,用于创建BulkWriter,我们可以从这里入手,注入自定义的BulkWriter,在写入文件的时候修改parquet文件名。
以下是若干个相关类,经实测通过。这个方案最大的问题是需要通过反射获取targetFile文件名,所以有可能在未来的flink版本里失效。
StreamingFileSink:
Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
.forBulkFormat(new Path("my/base/path"), new MyParquetWriterFactory(new MyParquetBuilder()))
.withBucketAssigner(new MyBucketAssigner())
.build();
MyParquetWriterFactory:
static class MyParquetWriterFactory extends ParquetWriterFactory {
public MyParquetWriterFactory(ParquetBuilder writerBuilder) {
super(writerBuilder);
}
@Override
public BulkWriter create(FSDataOutputStream stream) throws IOException {
BulkWriter writer = super.create(stream);
return new MyParquetBulkWriter(writer, stream);
}
}
MyParquetBulkWriter:
/**
* 包装flink的ParquetBulkWriter,修改part文件名格式
*/
static class MyParquetBulkWriter implements BulkWriter<GenericRecord> {
private FSDataOutputStream stream;
private BulkWriter writer;
private int rowCount;
public MyParquetBulkWriter(BulkWriter writer, FSDataOutputStream stream) {
this.writer = writer;
this.stream = stream;
}
@Override
public void addElement(GenericRecord element) throws IOException {
writer.addElement(element);
rowCount++; //记录计数,结果将作为文件名的一部分
}
@Override
public void flush() throws IOException {
writer.flush();
}
@Override
public void finish() throws IOException {
// 试图在finish()后改名失败,因为finish()后正式文件并没有生成
// 通过反射直接修改stream里的targetFile名称可行
// 这里是修改part文件名的关键部分
try {
Field field = stream.getClass().getDeclaredField("targetFile");
field.setAccessible(true);
File targetFile = (File) field.get(stream);
File renamedTargetFile = new File(targetFile.getParent(), rowCount + "_" + System.currentTimeMillis() + ".parquet");
field.set(stream, renamedTargetFile);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} finally {
writer.finish();
}
}
}
MyParquetBuilder(其中avroSchema是在外部赋值的):
static class MyParquetBuilder implements ParquetBuilder {
@Override
public ParquetWriter createWriter(OutputFile outputFile) throws IOException {
return
ParquetWriter.builder(outputFile).withSchema(avroSchema).withDataModel(GenericData.get()).build();
}
}
参考资料
Flink streaming - Change part file names when using StreamingFileSink?