前言
除了由 DataStream 操作产生的主要流之外,还可以产生任意数量的旁路输出结果流。结果流中的数据类型不必与主要流中的数据类型相匹配,并且不同旁路输出的类型也可以不同。当你需要拆分数据流时,通常必须复制该数据流,然后从每个流中过滤掉不需要的数据。
使用旁路输出时,首先需要定义用于标识旁路输出流的 OutputTag:
//需要使用匿名内部类,其中T是泛型
OutputTag<T> outputTag = new OutputTag<T>("side-output") {};
可以通过以下方法将数据发送到旁路输出:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
代码示例
1.流复制
将流复制两份 发到测输出流stream1 和stream2,代码如下(示例):
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class SideOutputTest {
public static final String TYPE = "type";
public static void main(String[] args) throws Exception {
//获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final ParameterTool params = ParameterTool.fromArgs(args);
String hostName = params.get("hostname", "10.68.8.59");
int port = params.getInt("port", 9999);
// nc -l 9999
DataStream<String> sourceStream = env.socketTextStream(hostName, port, "\n");
SingleOutputStreamOperator<JSONObject> jsonObjectStream = sourceStream.map(s -> JSONObject.parseObject(s));
//定义OutputTag
OutputTag<JSONObject> outputTag1 = new OutputTag<JSONObject>("stream1") {
};
OutputTag<JSONObject> outputTag2 = new OutputTag<JSONObject>("stream2") {
};
//将流复制两份 发到测输出流stream1 和stream2
SingleOutputStreamOperator<JSONObject> outputStream = jsonObjectStream.process(new ProcessFunction<JSONObject, JSONObject>() {
@Override
public void processElement(JSONObject jsonObject, Context context, Collector<JSONObject> collector)
throws Exception {
context.output(outputTag1, jsonObject);
context.output(outputTag2, jsonObject);
}
});
DataStream<JSONObject> stream1 = outputStream.getSideOutput(outputTag1);
DataStream<JSONObject> stream2 = outputStream.getSideOutput(outputTag2);
//数据去向
//stream1
stream1.map(e -> {
e.put("stream", "stream1");
return e;
}).print();
//stream2
stream2.map(e -> {
e.put("stream", "stream2");
return e;
}).print();
env.execute("SocketStreamTest");
}
}
2.条件分流
可以根据自定义条件将数据分流。
public class SplitDemo {
public static final OutputTag<Integer> evenTag = new OutputTag<Integer>("even"){};
public static final OutputTag<Integer> oddTag = new OutputTag<Integer>("odd"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = executionEnvironment.fromElements(1, 2, 3, 4, 5);
SingleOutputStreamOperator<Integer> process = source.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, ProcessFunction<Integer, Integer>.Context ctx, Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
// 这里不使用out.collect,而是使用ctx.output
// 这个方法多了一个参数,可以指定output tag,从而实现数据分流
ctx.output(evenTag, value);
} else {
ctx.output(oddTag, value);
}
}
});
// 依赖OutputTag获取对应的旁路输出
DataStream<Integer> evenStream = process.getSideOutput(evenTag);
DataStream<Integer> oddStream = process.getSideOutput(oddTag);
// 分别打印两个旁路输出流中的数据
evenStream.process(new ProcessFunction<Integer, String>() {
@Override
public void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {
out.collect("Even: " + value);
}
}).print();
oddStream.process(new ProcessFunction<Integer, String>() {
@Override
public void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {
out.collect("Odd: " + value);
}
}).print();
executionEnvironment.execute();
}
}
3.迟到数据分流
public class OutOfOrderDemo {
// 创建tag
public static final OutputTag<Tuple2<String, Integer>> lateTag = new OutputTag<Tuple2<String, Integer>>("late"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 示例数据,其中D乱序,I来迟(H到来的时候认为15000ms之前的数据已经到齐)
SingleOutputStreamOperator<Tuple2<String, Integer>> source = executionEnvironment.fromElements(
new Tuple2<>("A", 0),
new Tuple2<>("B", 1000),
new Tuple2<>("C", 2000),
new Tuple2<>("D", 7000),
new Tuple2<>("E", 3000),
new Tuple2<>("F", 4000),
new Tuple2<>("G", 5000),
new Tuple2<>("H", 20000),
new Tuple2<>("I", 8000)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forGenerator(new WatermarkGeneratorSupplier<Tuple2<String, Integer>>() {
// 这里自定义WatermarkGenerator的原因是Flink按照运行时间周期发送watermark,但我们的例子是单次执行的,可以认为数据是一瞬间到来
// 因此我们改写为每到来一条数据发送一次watermark,watermark的时间戳为数据的事件事件减去5000毫秒,意思是最多容忍数据来迟5000毫秒
@Override
public WatermarkGenerator<Tuple2<String, Integer>> createWatermarkGenerator(Context context) {
return new WatermarkGenerator<Tuple2<String, Integer>>() {
@Override
public void onEvent(Tuple2<String, Integer> event, long eventTimestamp, WatermarkOutput output) {
long watermark = eventTimestamp - 5000L < 0 ? 0L : eventTimestamp - 5000L;
output.emitWatermark(new Watermark(watermark));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
};
}
// 取第二个字段为watermark
}).withTimestampAssigner((element, timestamp) -> element.f1));
// 窗口大小5秒,允许延迟5秒
// watermark和allowedLateness的区别是,watermark决定了什么时候窗口数据触发计算,allowedLateness决定什么数据被认为是lateElement,从而发送到sideOutput
// 设置side output tag
source.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(5)).sideOutputLateData(lateTag).process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>() {
@Override
public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
Iterator<Tuple2<String, Integer>> iterator = elements.iterator();
System.out.println("--------------------");
while(iterator.hasNext()) {
System.out.println(iterator.next());
}
}
// 打印sideoutput流内容
}).getSideOutput(lateTag).process(new ProcessFunction<Tuple2<String, Integer>, Object>() {
@Override
public void processElement(Tuple2<String, Integer> value, ProcessFunction<Tuple2<String, Integer>, Object>.Context ctx, Collector<Object> out) throws Exception {
System.out.println("Late element: " + value);
}
});
executionEnvironment.execute();
}
}