点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink CEP 核心组件
- CEP 的应用场景
- CEP 的优势
超时事件提取
当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃,为了能够处理这些超时的部分匹配,select和flatSelectAPI调用允许制定超时处理程序。
FlinkCEP开发流程
- DataSource中的数据转换为DataStream
- 定义Pattern,并将DataStream和Pattern组合转换为PatternStream。
- PatternStream 经过 Select、Process 等算子转换为 DataStream
- 再次转换为 DataStream 经过处理后,Sink到目标库。
SELECT 方法:
SingleOutputStreamOperator<PayEvent> result =
patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<PayEvent, PayEvent>() {
@Override
public PayEvent timeout(Map<String, List<PayEvent>> map, long l) throws Exception {
return map.get("begin").get(0);
}
}, new PatternSelectFunction<PayEvent, PayEvent>() {
@Override
public PayEvent select(Map<String, List<PayEvent>> map) throws Exception {
return map.get("pay").get(0);
}
});
对检测到的序列模式序列应用选择函数,对于每个模式序列,调用提供的 PatternSelectFunction,模式选择函数只能产生一个结果元素。
对超时的部分模式序列应用超时函数,对于每个部分模式序列,调用提供的 PatternTimeoutFunction,模式超时函数只能产生一个结果元素。
你可以在使用相同 OutputTag 进行 Select 操作 SingleOutputStreamOperator上获得SingleOutputStreamOperator生成的超时数据流。
非确定有限自动机
FlinkCEP 在运行时会将用户的逻辑转换为这样一个 NFA Graph(NFA对象)
所以有限状态机的工作过程,就是从开始状态,根据不同的输入,自动进行转换的过程。
上图中的状态机的功能,是检测二进制数是否含有偶数个0。从图上可以看出,输入只有1和0两种。
从S1状态开始,只有输入0才会转换到S2状态,同样S2状态下只有输入0才会转换到S1。所以,二进制输入完毕,如果满足最终状态,也就是最后停在S1状态,那么输入的二进制数就含有偶数个0。
CEP开发流程
FlinkCEP开发流程:
- DataSource中数据转换为DataStream、Watermark、keyby
- 定义Pattern,并将DataStream和Pattern组合转换为PatternStream
- PatternStream经过select、process等算子转换为 DataStream
- 再次转换为 DataStream 经过处理后,Sink到目标库
添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
案例1:恶意登录检测
找出5秒内,连续登录失败的账号
以下是数据:
new CepLoginBean(1L, "fail", 1597905234000L),
new CepLoginBean(1L, "success", 1597905235000L),
new CepLoginBean(2L, "fail", 1597905236000L),
new CepLoginBean(2L, "fail", 1597905237000L),
new CepLoginBean(2L, "fail", 1597905238000L),
new CepLoginBean(3L, "fail", 1597905239000L),
new CepLoginBean(3L, "success", 1597905240000L)
整体思路
- 获取到数据
- 在数据源上做Watermark
- 在Watermark上根据ID分组keyBy
- 做出模式Pattern
- 在数据流上进行模式匹配
- 提取匹配成功的数据
编写代码
package icu.wzk;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class FlinkCepLoginTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<CepLoginBean> data = env.fromElements(
new CepLoginBean(1L, "fail", 1597905234000L),
new CepLoginBean(1L, "success", 1597905235000L),
new CepLoginBean(2L, "fail", 1597905236000L),
new CepLoginBean(2L, "fail", 1597905237000L),
new CepLoginBean(2L, "fail", 1597905238000L),
new CepLoginBean(3L, "fail", 1597905239000L),
new CepLoginBean(3L, "success", 1597905240000L)
);
SingleOutputStreamOperator<CepLoginBean> watermarks = data
.assignTimestampsAndWatermarks(new WatermarkStrategy<CepLoginBean>() {
@Override
public WatermarkGenerator<CepLoginBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<CepLoginBean>() {
long maxTimestamp = Long.MAX_VALUE;
long maxOutOfOrderness = 500L;
@Override
public void onEvent(CepLoginBean event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
}
};
}
}.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
KeyedStream<CepLoginBean, Long> keyed = watermarks
.keyBy(new KeySelector<CepLoginBean, Long>() {
@Override
public Long getKey(CepLoginBean value) throws Exception {
return value.getUserId();
}
});
Pattern<CepLoginBean, CepLoginBean> pattern = Pattern
.<CepLoginBean>begin("start")
.where(new IterativeCondition<CepLoginBean>() {
@Override
public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) throws Exception {
return cepLoginBean.getOperation().equals("fail");
}
})
.next("next")
.where(new IterativeCondition<CepLoginBean>() {
@Override
public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) throws Exception {
return cepLoginBean.getOperation().equals("fail");
}
})
.within(Time.seconds(5));
PatternStream<CepLoginBean> patternStream = CEP.pattern(keyed, pattern);
SingleOutputStreamOperator<CepLoginBean> process = patternStream
.process(new PatternProcessFunction<CepLoginBean, CepLoginBean>() {
@Override
public void processMatch(Map<String, List<CepLoginBean>> map, Context context, Collector<CepLoginBean> collector) throws Exception {
System.out.println("map: " + map);
List<CepLoginBean> start = map.get("start");
collector.collect(start.get(0));
}
});
process.print();
env.execute("FlinkCepLoginTest");
}
}
class CepLoginBean {
private Long userId;
private String operation;
private Long timestamp;
public CepLoginBean(Long userId, String operation, Long timestamp) {
this.userId = userId;
this.operation = operation;
this.timestamp = timestamp;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getOperation() {
return operation;
}
public void setOperation(String operation) {
this.operation = operation;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "CepLoginBean{" +
"userId=" + userId +
", operation='" + operation + '\'' +
", timestamp=" + timestamp +
'}';
}
}
运行结果
可以看到程序输出:
map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}
map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905238000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}
Process finished with exit code 0
运行截图如下所示: