公司产品为支持流式数据处理采用了Flink+Kafka的组合,在此记录环境安装部署和开发要点。
安装和启动kafka
从kafka官网下载并解压到本地即可。
建议:如果从远程访问这个kafka,需要修改config/server.properties里的listeners属性为实际ip地址,否则producer发送数据时会提示“Connection to node 0 (localhost/127.0.0.1:9092) could not be established”:
listeners=PLAINTEXT://<ipaddress>:9092
启动kafka(如果是windows环境,将bin改为bin\windows,.sh改为.bat):
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
安装flink
从flink官网下载压缩包,解压到本地即可。
启动flink:
bin/start-cluster
启动后访问 localhost:8081 可打开Flink Web Dashboard:
创建flink项目
用maven自动创建项目框架,这一步根据网络情况可能比较慢,耐心等待10分钟左右:
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0 -DgroupId=com.test -DartifactId=flink -Dversion=1.0.0 -Dpackage=com.test -DinteractiveMode=false
在生成的pom.xml里添加flink-kafka-connector依赖(注意scala版本要与下载的kafka的scala版本一致):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
我们要处理流式数据,因此在生成的StreamingJob.java基础上修改。
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "10.1.10.76:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("flink_test", new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer);
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("c:/temp/kafka-loader"), new SimpleStringEncoder<String>())
.withBucketAssigner(new MyBucketAssigner())
.build();
stream.addSink(sink);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
static class MyBucketAssigner implements BucketAssigner<String, String> {
@Override
public String getBucketId(String element, Context context) {
return "" + element.charAt(0);
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}
运行flink作业
方式1:在IDE里直接运行main()方法即可,此处不细述。
方式2:先mvn clean package
打成jar包,然后打开localhost:8081
控制台页面,选择左侧的Submit New Job
菜单上传生成的jar包(按前述maven生成的pom.xml配置会自动生成两个jar包,要上传其中比较大的那个fatjar包)。上传成功后点击jar包,再点击Submit
按钮即可。
测试发送数据
bin\windows\kafka-console-producer --broker-list <ipaddress>:9092 --topic flink_test
随机输入一些字符串并按回车键,在/tmp/kafka-loader目录下,应该会按字符串首字母生成相应的目录,里面的文件内容是所输入的字符串。
一些坑
FlinkKafkaConsumer
FlinkKafkaConsumer()
的构造方法里,第二个参数可以是DeserializationSchema
类型,也可以是KafkaDeserializationSchema
类型,之前为了将flink-connector-kafka里自带的JSONKeyValueDeserializationSchema
等(详见链接)转为前者找了半天,其实不用转直接用就可以。
JSONKeyValueDeserializationSchema
JSONKeyValueDeserializationSchema
适合kafka消息内容为json格式的,如果不是json格式,比如是逗号分隔的格式,还是自己实现KafkaDeserializationSchema
,并不复杂。比如之前我用SimpleStringSchema
无法获取到消息里的key信息,就需要用flink-connector-kafka提供的Deser:
static class MyKeyedDeserializationSchema implements KafkaDeserializationSchema<String> {
@Override
public boolean isEndOfStream(String s) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
return consumerRecord.key() + "," + consumerRecord.value(); // key()是repo名称,将其插入消息体以便后续处理
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
Part文件名
为方便进行数据统计,对parquet文件名做了设计要求,但flink本身没有支持定制parquet文件名称,经过研究找到一个方法单独记录了一篇文章:定制Flink输出的parquet文件名