示例环境
java.version: 1.8.x
flink.version: 1.11.1
elasticsearch:6.x
示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
DataStreamSource.java
package com.flink.examples.elasticsearch;
import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.Map;
/**
* @Description 从elasticsearch中获取数据并输出到DataStream数据流中
*/
public class DataStreamSource {
/**
* 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<TUser> dataStream = env.addSource(new RichSourceFunction<TUser>(){
private RestClientBuilder builder = null;
//job开始执行,调用此方法创建数据源连接对象,该方法主要用于打开连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
builder = RestClient.builder(new HttpHost("192.168.110.35", 9200, "http"));
}
//执行查询并对数据进行封装
@Override
public void run(SourceContext<TUser> ctx) throws Exception {
Gson gson = new Gson();
RestHighLevelClient client = null;
//匹配查询
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("sex", 1));
//定义索引库
SearchRequest request = new SearchRequest();
request.types("doc");
request.indices("flink_demo");
request.source(sourceBuilder);
try {
client = new RestHighLevelClient(builder);
SearchResponse response = client.search(request, new Header[]{});
SearchHits hits = response.getHits();
System.out.println("查询结果有" + hits.getTotalHits() + "条");
for (SearchHit searchHits : hits ) {
Map<String,Object> dataMap = searchHits.getSourceAsMap();
TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
ctx.collect(user);
}
//ID查询
// GetRequest request = new GetRequest( "flink_demo","doc","NeMaoXQBElQ9wTD5MOfB");
// client = new RestHighLevelClient(builder);
// GetResponse getResponse = client.get(request, new Header[]{});
// Map<String,Object> dataMap = getResponse.getSourceAsMap();
// TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
// ctx.collect(user);
}catch(IOException ioe){
ioe.printStackTrace();
}finally {
if (client != null){
client.close();
}
}
}
//Job结束时调用
@Override
public void cancel() {
try {
super.close();
} catch (Exception e) {
}
builder = null;
}
});
dataStream.print();
env.execute("flink es to data job");
}
}
数据流输出
DataStreamSink.java
package com.flink.examples.elasticsearch;
import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @Description 将DataStream数据流输出到elasticsearch中
*/
public class DataStreamSink {
/**
* 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.setParallelism(2);
//1.设置Elasticsearch连接,创建索引数据
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("192.168.110.35", 9200, "http"));
//创建数据源对象 ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(httpHosts,
new ElasticsearchSinkFunction<String>() {
@Override
public void process(String user, RuntimeContext ctx, RequestIndexer indexer) {
Gson gson = new Gson();
Map<String,Object> map = gson.fromJson(user, Map.class);
indexer.add(Requests.indexRequest()
.index("flink_demo")
.type("doc")
.source(map));
}
}
);
// 设置批量写数据的最大动作量,对批量请求的配置;这指示接收器在每个元素之后发出,否则它们将被缓冲
esSinkBuilder.setBulkFlushMaxActions(10);
//刷新前缓冲区的最大数据大小(以MB为单位)
esSinkBuilder.setBulkFlushMaxSizeMb(500);
//论缓冲操作的数量或大小如何都要刷新的时间间隔
esSinkBuilder.setBulkFlushInterval(4000);
//2.写入数据到流中
//封装数据
TUser user = new TUser();
user.setId(9);
user.setName("wang1");
user.setAge(23);
user.setSex(1);
user.setAddress("CN");
user.setCreateTimeSeries(System.currentTimeMillis());
DataStream<String> input = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));
//3.将数据写入到Elasticearch中
input.addSink(esSinkBuilder.build());
env.execute("flink data to es job");
}
}
数据展示