示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11
示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
DataStreamSource.java
package com.flink.examples.kafka;
import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
/**
* @Description 从Kafka中消费数据
*/
public class DataStreamSource {
/**
* 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度(使用几个CPU核心)
env.setParallelism(1);
//每隔2000ms进行启动一个检查点
env.enableCheckpointing(2000);
//设置模式为exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有进行500 ms的进度
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//1.消费者客户端连接到kafka
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-45");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props);
//setStartFromEarliest()会从最早的数据开始进行消费,忽略存储的offset信息
//consumer.setStartFromEarliest();
//Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略
//consumer.setStartFromTimestamp(1559801580000L);
//Flink从topic中最新的数据开始消费
//consumer.setStartFromLatest();
//Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
//consumer.setStartFromGroupOffsets();
//2.在算子中进行处理
DataStream<TUser> sourceStream = env.addSource(consumer)
.filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value))
.map((MapFunction<String, TUser>) value -> {
System.out.println("print:" + value);
//注意,因已开启enableCheckpointing容错定期检查状态机制,当算子出现错误时,
//会导致数据流恢复到最新checkpoint的状态,并从存储在checkpoint中的offset开始重新消费Kafka中的消息。
//因此会有可能导制数据重复消费,重复错误,陷入死循环。加上try|catch,捕获错误后再正确输出。
Gson gson = new Gson();
try {
TUser user = gson.fromJson(value, TUser.class);
return user;
}catch(Exception e){
System.out.println("error:" + e.getMessage());
}
return new TUser();
})
.returns(TUser.class);
sourceStream.print();
//3.执行
env.execute("flink kafka source");
}
}
数据流输出
DataStreamSink.java
package com.flink.examples.kafka;
import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
/**
* @Description 将生产者数据写入到kafka
*/
public class DataStreamSink {
/**
* 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//必需设置setParallelism并行度,否则不会输出
env.setParallelism(1);
//每隔2000ms进行启动一个检查点
env.enableCheckpointing(2000);
//设置模式为exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有进行500 ms的进度
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//1.连接kafka
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("test", new SimpleStringSchema(), props);
//2.创建数据,并写入数据到流中
TUser user = new TUser();
user.setId(8);
user.setName("liu3");
user.setAge(22);
user.setSex(1);
user.setAddress("CN");
user.setCreateTimeSeries(1598889600000L);
DataStream<String> sourceStream = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));
//3.将数据流输入到kafka
sourceStream.addSink(producer);
sourceStream.print();
env.execute("flink kafka sink");
}
}
- 在kafka上创建名称为test的topic
- 先启动DataStreamSource.java获取输出流,在启动DataStreamSink.java输入流
数据展示