一、背景说明

需求为从Kafka消费对应主题数据,通过TableApi对数据进行WordCount后,基于DDL写法将数据写入ES。
Flink从Kafka取数WordCount后TableApi写入ES-LMLPHP

二、代码部分

说明:代码中关于Kafka及ES的连接部分可以抽象到单独的工具类使用,这里只是一个演示的小demo,后续操作均可自行扩展,如Kakfa一般处理为json格式数据,引入fastjson等工具使用富函数进行格式处理即可。

package com.flinksql.test;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;
import static org.apache.flink.table.api.Expressions.$;

/**
 * @author: Rango
 * @create: 2021-06-20 10:21
 * @description: 使用FlinkSQL实现从kafka读取数据计算wordcount并将数据写入ES
 **/
public class FlinkTableAPI_Test {
    public static void main(String[] args) throws Exception {
        //1.建立环境,测试不设置CK
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.获取kafka端数据
        Properties prop = new Properties();
        prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"BD");
        DataStreamSource<String> sourceDS = env
                .addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), prop));

        //3.使用flatmap转换数据到javabean,使用flatmap可以实现过滤
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapDS = sourceDS
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(new Tuple2<>(s, 1));
                }}
        });

        //4.流数据转为table
        Table table = tableEnv.fromDataStream(flatMapDS);
        Table table1 = table
                .groupBy($("f0"))
                .select($("f0").as("word"), $("f1").sum().as("num"));
        tableEnv.toRetractStream(table1, Row.class).print();

        //5.DDL方式建立临时表,写入datastream数据,为演示需要maxactions设置为1,默认是批量写入
        tableEnv.executeSql("CREATE TABLE sensor (" +
                "  word STRING," +
                "  num BIGINT," +
                "  PRIMARY KEY (word) NOT ENFORCED" +
                ") WITH (" +
                "  'connector' = 'elasticsearch-7'," +
                "  'hosts' = 'http://localhost:9200'," +
                "  'index' = 'test'," +
                "  'sink.bulk-flush.max-actions' = '1')");
        //6.数据写入
        table1.executeInsert("sensor");
        env.execute();
    }
}

学习交流,有任何问题还请随时评论指出交流。

06-20 12:54