flink作为一个大数据框架,已经由阿里充分的证实了其性能和前景。但对国内仍然是一个比较陌生的状态,无论是开源的文档和实例都比较缺乏。之前找到的demo很多都是旧版本;同时flink本身面临版本演进,blink开源等一些影响,也会在之后出现一些比较大的变化。

我根据目前的资料,编写了基于flink的Kafka生产消费demo,便于初步的了解flink的特性,也希望能对他人了解flink提供一些小小的帮助。

以下demo是我根据一些其他的demo整合而成,引用申明见最下方。

 


导包

flink的导包是常见的核心+拓展模式

flink-java是flink的核心实现.我使用的是(看起来比较成熟的1.5.0,可能1.6.0更合适)

flink-streaming-java_2.11是使用flink流接入的包

flink-connector-kafka-0.10_2.11是为了使用Kafka开发的中间件,而使用这个包必须保证版号对应,及使用kafka0.10和flink流2.11

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.5.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.5.0</version>
        </dependency>

写入(生产者)

源为一个循环的数据产生器,输出为对应的kafka topic

public class WriteIntoKafka {
    public static void main(String[] args) throws Exception {
        // create execution environment
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Map properties= new HashMap();
        properties.put("bootstrap.servers", "/*服务地址*/");
        properties.put("topic", "/*topic*/");

        // parse user parameters
        ParameterTool parameterTool = ParameterTool.fromMap(properties);

        // add a simple source which is writing some strings
        DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());

        // write stream to Kafka
        messageStream.addSink(new FlinkKafkaProducer010<>(parameterTool.getRequired("bootstrap.servers"),
                parameterTool.getRequired("topic"),
                new SimpleStringSchema()));

        messageStream.rebalance().map(new MapFunction<String, String>() {
            //序列化设置
            private static final long serialVersionUID = 1L;

            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });

        messageStream.print();

        env.execute();
    }

    public static class SimpleStringGenerator implements SourceFunction<String> {
    	//序列化设置
        private static final long serialVersionUID = 1L;
        boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while(running) {
                ctx.collect(prouderJson());
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
 }

读取(消费者)

采用Kafka消费者作为源,通过MapFunction转换后输出

public class ReadFromKafka {

    public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Map properties= new HashMap();
        properties.put("bootstrap.servers", "/*服务地址*/");
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("topic", "/*topic*/");
        // parse user parameters

        ParameterTool parameterTool = ParameterTool.fromMap(properties);

        FlinkKafkaConsumer010 consumer010 = new FlinkKafkaConsumer010(
                         parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());


        DataStream<String> messageStream = env
                .addSource(consumer010);

        // print() will write the contents of the stream to the TaskManager's standard out stream
        // the rebelance call is causing a repartitioning of the data so that all machines
        // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
        messageStream.rebalance().map(new MapFunction<String, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });


 messageStream.print();

 env.execute();
}
}

细节

1,配置我采用了 Map properties= new HashMap(); 这种在程序内部写入的方式。

也可以采用读取文件的配置的方式。

3,另一种配置方式,可以对FlinkKafkaConsumer010进行配置,具体使用待补充

采用Kafka为data Source使用

2,新旧版本的最大区别就是 使用FlinkKafkaProducer010和FlinkKafkaProducer010作为生产者和消费者的实例,具体方法的使用和之前的版本有些差别

4,生产者源的产生方式是值得研究的,也应该有其他的产生方式。很多实现细节没有弄清楚(待补充)

5.消费者转换的MapFunction感觉仍有不清楚的地方,这一步可以实现什么逻辑,适合写什么样的逻辑呢(待补充)

6,输出细节,可以在 实现中输出,也可以使用messageStream.print(),进行输出,具体格式可以自行验证。


问题

以上两个demo是调用flink—Kafka的api实现的,但是仍有一些的问题,没有弄清:

1,关于生产者,如何自定义发送数据格式,如byte[],如何自定义发送的分区

2,关于消费者kafka低级api的使用,seek2end,拉取指定分区,手动提交偏移量等设置的实现


引用

https://www.jianshu.com/p/f9d447a3c48f

https://blog.csdn.net/lmalds/article/details/51780950

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

github中的某段代码(找了很久,忘了出处了)

10-07 11:44