示例环境

java.version: 1.8.x
flink.version: 1.11.1
elasticsearch:6.x

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 DataStream Connectors 与 示例模块

数据流输入

DataStreamSource.java

package com.flink.examples.elasticsearch;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Map;
/**
 * @Description 从elasticsearch中获取数据并输出到DataStream数据流中
 */
public class DataStreamSource {
    /**
     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<TUser> dataStream = env.addSource(new RichSourceFunction<TUser>(){
            private RestClientBuilder builder = null;
            //job开始执行,调用此方法创建数据源连接对象,该方法主要用于打开连接
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                builder = RestClient.builder(new HttpHost("192.168.110.35", 9200, "http"));
            }
            //执行查询并对数据进行封装
            @Override
            public void run(SourceContext<TUser> ctx) throws Exception {
                Gson gson = new Gson();
                RestHighLevelClient client = null;
                //匹配查询
                SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
                sourceBuilder.query(QueryBuilders.matchQuery("sex", 1));
                //定义索引库
                SearchRequest request = new SearchRequest();
                request.types("doc");
                request.indices("flink_demo");
                request.source(sourceBuilder);
                try {
                    client = new RestHighLevelClient(builder);
                    SearchResponse response = client.search(request, new Header[]{});
                    SearchHits hits = response.getHits();
                    System.out.println("查询结果有" + hits.getTotalHits() + "条");
                    for (SearchHit searchHits : hits ) {
                        Map<String,Object> dataMap = searchHits.getSourceAsMap();
                        TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
                        ctx.collect(user);
                    }
                    //ID查询
//                    GetRequest request = new GetRequest( "flink_demo","doc","NeMaoXQBElQ9wTD5MOfB");
//                    client = new RestHighLevelClient(builder);
//                    GetResponse getResponse = client.get(request, new Header[]{});
//                    Map<String,Object> dataMap = getResponse.getSourceAsMap();
//                    TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
//                    ctx.collect(user);
                }catch(IOException ioe){
                    ioe.printStackTrace();
                }finally {
                    if (client != null){
                        client.close();
                    }
                }
            }
            //Job结束时调用
            @Override
            public void cancel() {
                try {
                    super.close();
                } catch (Exception e) {
                }
                builder = null;
            }
        });
        dataStream.print();
        env.execute("flink es to data job");
    }

}

数据流输出

DataStreamSink.java

package com.flink.examples.elasticsearch;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @Description 将DataStream数据流输出到elasticsearch中
 */
public class DataStreamSink {

    /**
     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(2);
        //1.设置Elasticsearch连接,创建索引数据
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("192.168.110.35", 9200, "http"));
        //创建数据源对象 ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    @Override
                    public void process(String user, RuntimeContext ctx, RequestIndexer indexer) {
                        Gson gson = new Gson();
                        Map<String,Object> map = gson.fromJson(user, Map.class);
                        indexer.add(Requests.indexRequest()
                                .index("flink_demo")
                                .type("doc")
                                .source(map));
                    }
                }
        );
        // 设置批量写数据的最大动作量,对批量请求的配置;这指示接收器在每个元素之后发出,否则它们将被缓冲
        esSinkBuilder.setBulkFlushMaxActions(10);
        //刷新前缓冲区的最大数据大小(以MB为单位)
        esSinkBuilder.setBulkFlushMaxSizeMb(500);
        //论缓冲操作的数量或大小如何都要刷新的时间间隔
        esSinkBuilder.setBulkFlushInterval(4000);

        //2.写入数据到流中
        //封装数据
        TUser user = new TUser();
        user.setId(9);
        user.setName("wang1");
        user.setAge(23);
        user.setSex(1);
        user.setAddress("CN");
        user.setCreateTimeSeries(System.currentTimeMillis());
        DataStream<String> input = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));
        //3.将数据写入到Elasticearch中
        input.addSink(esSinkBuilder.build());
        env.execute("flink data to es job");
    }

}

数据展示

Flink 系例 之 Connectors 连接 ElasticSearch-LMLPHP

05-05 10:31