flink作为一个大数据框架,已经由阿里充分的证实了其性能和前景。但对国内仍然是一个比较陌生的状态,无论是开源的文档和实例都比较缺乏。之前找到的demo很多都是旧版本;同时flink本身面临版本演进,blink开源等一些影响,也会在之后出现一些比较大的变化。
我根据目前的资料,编写了基于flink的Kafka生产消费demo,便于初步的了解flink的特性,也希望能对他人了解flink提供一些小小的帮助。
以下demo是我根据一些其他的demo整合而成,引用申明见最下方。
导包
flink的导包是常见的核心+拓展模式
flink-java是flink的核心实现.我使用的是(看起来比较成熟的1.5.0,可能1.6.0更合适)
flink-streaming-java_2.11是使用flink流接入的包
flink-connector-kafka-0.10_2.11是为了使用Kafka开发的中间件,而使用这个包必须保证版号对应,及使用kafka0.10和flink流2.11
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.5.0</version>
</dependency>
写入(生产者)
源为一个循环的数据产生器,输出为对应的kafka topic
public class WriteIntoKafka {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Map properties= new HashMap();
properties.put("bootstrap.servers", "/*服务地址*/");
properties.put("topic", "/*topic*/");
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromMap(properties);
// add a simple source which is writing some strings
DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
// write stream to Kafka
messageStream.addSink(new FlinkKafkaProducer010<>(parameterTool.getRequired("bootstrap.servers"),
parameterTool.getRequired("topic"),
new SimpleStringSchema()));
messageStream.rebalance().map(new MapFunction<String, String>() {
//序列化设置
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
return value;
}
});
messageStream.print();
env.execute();
}
public static class SimpleStringGenerator implements SourceFunction<String> {
//序列化设置
private static final long serialVersionUID = 1L;
boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while(running) {
ctx.collect(prouderJson());
}
}
@Override
public void cancel() {
running = false;
}
}
}
读取(消费者)
采用Kafka消费者作为源,通过MapFunction转换后输出
public class ReadFromKafka {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Map properties= new HashMap();
properties.put("bootstrap.servers", "/*服务地址*/");
properties.put("group.id", "test");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("topic", "/*topic*/");
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromMap(properties);
FlinkKafkaConsumer010 consumer010 = new FlinkKafkaConsumer010(
parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
DataStream<String> messageStream = env
.addSource(consumer010);
// print() will write the contents of the stream to the TaskManager's standard out stream
// the rebelance call is causing a repartitioning of the data so that all machines
// see the messages (for example in cases when "num kafka partitions" < "num flink operators"
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
return value;
}
});
messageStream.print();
env.execute();
}
}
细节
1,配置我采用了 Map properties= new HashMap(); 这种在程序内部写入的方式。
也可以采用读取文件的配置的方式。
3,另一种配置方式,可以对FlinkKafkaConsumer010进行配置,具体使用待补充
采用Kafka为data Source使用
2,新旧版本的最大区别就是 使用FlinkKafkaProducer010和FlinkKafkaProducer010作为生产者和消费者的实例,具体方法的使用和之前的版本有些差别
4,生产者源的产生方式是值得研究的,也应该有其他的产生方式。很多实现细节没有弄清楚(待补充)
5.消费者转换的MapFunction感觉仍有不清楚的地方,这一步可以实现什么逻辑,适合写什么样的逻辑呢(待补充)
6,输出细节,可以在 实现中输出,也可以使用messageStream.print(),进行输出,具体格式可以自行验证。
问题
以上两个demo是调用flink—Kafka的api实现的,但是仍有一些的问题,没有弄清:
1,关于生产者,如何自定义发送数据格式,如byte[],如何自定义发送的分区
2,关于消费者kafka低级api的使用,seek2end,拉取指定分区,手动提交偏移量等设置的实现
引用
https://www.jianshu.com/p/f9d447a3c48f
https://blog.csdn.net/lmalds/article/details/51780950
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
github中的某段代码(找了很久,忘了出处了)