一、工程创建与准备

使用maven进行工程创建,且采用提供的flink-quickstart模版,便利很多。😄

$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink  \
-DarchetypeArtifactId=flink-quickstart-java  \
-DarchetypeVersion=1.6.2

本实验的数据采用自拟电影评分数据(userId, movieId, rating, timestamp),userId和movieId范围分别为1-100和1-200的随机数,rating范围为[0:0.5:5.0]一共10个档位,timestamp为10000-20000之间的随机数,且数据顺序采用timestamp的升序排列。(2.1-2.6节的数据是乱序)

由于该文只是为了熟悉操作符的用法,所以数据自拟更有针对性。

二、操作符

2.0 Baseline

以下是本次实验的baseline,源source为kafka提供,所以还需要建立一个将数据一个个放入kafka的class。

这里的流对象使用的是POJO类型,即MovieRate类,在之后的各种操作符的使用中也更加方便。

public class Baseline {
    public static void main(String[] args) throws Exception {
        // 1. Get an ExecutionEnvironment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test2");
        properties.setProperty("auto.offset.reset", "earliest");

        // 2. Get the source
        FlinkKafkaConsumer011<MovieRate> myConsumer = new FlinkKafkaConsumer011<MovieRate>(
                "test2",
                new MovieRateSchema(),
                properties);
        DataStream<MovieRate> rates = env
                .addSource(myConsumer);

        // 3. Set the sink
        rates.print();

        // 4. Execute
        env.execute();
    }
}
...
1> 50,181,1.5,13667
1> 53,83,1,11838
1> 87,112,3.5,11701
1> 66,199,5,12427
...

2.1 Filter

对读入的每个element执行bool操作,保留返回True的element。

这里,我们新建一个MovieFilter过滤器,对movieId大于100的电影过滤掉。

// Baseline
DataStream<MovieRate> filteredRate = rates
                .filter(new MovieFilter());
// MovieFilter
public static class MovieFilter implements FilterFunction<MovieRate>{
        @Override
        public boolean filter(MovieRate movieRate) throws Exception {
            if (movieRate.movieId > 100){
                return false;
            } else {
                return true;
            }
        }
    }

运行后,可以看到movieId大于100的日志已经被过滤:

...
1> 74,36,3.5,14522
1> 90,46,4.5,14166
1> 3,52,1.5,12222
1> 19,36,1.0,12055
...

2.2 Map

DataStream ➡ DataStream

对流中的每一个元素进行转换。

如下列表示将刚才处理后的每个element的评分✖️2

//Baseline
DataStream<MovieRate> filteredRate = rates
    .filter(new MovieFilter())
    .map(new RateMap());
// RateMap
public static class RateMap implements MapFunction<MovieRate, MovieRate>{

    @Override
    public MovieRate map(MovieRate movieRate) throws Exception {
        movieRate.rate = 2 * movieRate.rate;
        return movieRate;
    }
}

这样的话可以看到评分已经变成的以10为满分的整数了:

...
1> 86,65,6.0,14262
1> 56,32,10.0,19835
1> 36,54,5.0,19076
1> 8,21,4.0,12728
...

另外,map可以让我们很容易的使用lambdas。例如,如果我只想返回每个element的userId的话,我可以这样写:

// Baseline
DataStream<Integer> filteredRate = rates
    .filter(new MovieFilter())
    .map(value -> value.userId);

2.3 FlatMap

DataStream ➡ DataStream

读入一个元素,返回转换后的0个、1个或者多个元素。

事实上,FlatMap可以干Map可以干的任何事情。比如下面的代码是跟上面同样的操作,返回每个element的userId:

// Baseline
DataStream<Integer> filteredRate = rates
    .filter(new MovieFilter())
    .map(new RateMap())
    .flatMap(new FlatMapFunction<MovieRate, Integer>() {
        @Override
        public void flatMap(MovieRate movieRate, Collector<Integer> collector) throws Exception {
            collector.collect(movieRate.userId);
        }
    });

记得有三个地方需要改,上面代码中所有Integer的地方。

2.4 KeyBy

DataStream ➡ KeyedStream

逻辑上将流分区为不相交的分区(partitions),每个分区包含相同key的element。在内部通过hash分区来实现。

这里可以使用一个lambda来表示我们需要分区的key,这里我们的key选取的是rate:

DataStream<MovieRate> filteredRate = rates
    .filter(new MovieFilter())
    .map(new RateMap())
    .keyBy((MovieRate rate) -> rate.rate); // .keyBy(MovieRate::getRate)

所以,按道理来说,拥有不同rate的elements应该出现在不同的partitions中。但是,我指定来最大的partitions数量,所以下面可以看出,有不同rate的elements可能出现在不一样的partitions中,但是有相同rate的elements一定出现在相同的partitions中。

...
1> 86,65,6.0,14262
3> 56,32,10.0,19835
1> 36,54,5.0,19076
4> 8,21,4.0,12728
3> 31,89,8.0,16671
3> 66,54,7.0,19989
4> 42,15,4.0,10613
4> 8,18,4.0,14237
3> 13,68,7.0,18793
3> 11,74,10.0,12379
3> 40,12,8.0,11450
...

2.5 Reduce

KeyedStream ➡ DataStream

在一个KeyedStream上不断进行reduce操作。

下面的例子是将相同的user进行的评分相加:

// Baseline
DataStream<MovieRate> filteredRate = rates
    .filter(new MovieFilter())
    .map(new RateMap())
    .keyBy(MovieRate::getUserId)
    .reduce(new RateReduce());
// RateReduce
public static class RateReduce implements ReduceFunction<MovieRate>{
    @Override
    public MovieRate reduce(MovieRate movieRate, MovieRate t1) throws Exception {
        movieRate.setRate(movieRate.getRate()+t1.getRate());
        return movieRate;
    }
}

可以看到,有一些打印出来的元素他们的rate已经超过了10:

...
4> 16,35,3.0,10545
1> 8,15,11.0,14503
4> 79,33,18.0,12774
4> 88,53,8.0,14387
4> 19,43,13.0,10626
...

2.6 Aggregations

KeyedStream ➡ DataStream

在一个KeyedStream上不断聚合。min和minBy的区别是min返回最小值,而minBy返回在该字段上值最最小值的所有元素(对于max和maxBy相同)

这次我先把我们的流从POJO类型转换为Tuple4类型,然后再keyBy(userId)后进行sum()的操作。

DataStream<Tuple4<Integer,Integer,Double,Long>> tupleRate = rates
    .map(new MapFunction<MovieRate, Tuple4<Integer, Integer, Double, Long>>() {
       @Override
       public Tuple4<Integer, Integer, Double, Long> map(MovieRate m) throws Exception {
            return Tuple4.of(m.getUserId(),m.getMovieId(),m.getRate(),m.getTimeStamp());
                    }
                })
    .keyBy(0)
    .sum(2);

2.7 Window

这里需要介绍的就很多了,这一节并不对Flink的window机制进行介绍,而还是主要介绍window的基本操作用法。

对keyedStream进行时间窗口处理,返回的是Windowedstream

KeyedStream ➡ WindowedStream

WindowedStream<MovieRate> tupleRate = rates
                .keyBy((MovieRate rate)->rate.userId)
                .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))

这里介绍三种windows:

一般而言,window 是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。

  1. Time window

    Time Window 是根据时间对数据流进行分组的。Flink 提出了三种时间的概念,分别是event time(事件时间:事件发生时的时间),ingestion time(摄取时间:事件进入流处理系统的时间),processing time(处理时间:消息被计算处理的时间)

    • Event time

      以下例子为事件时间窗口为2毫秒,每个用户在两毫秒时间内对于电影所有评分的平均分。

      public static void main(String[] args) throws Exception {
              // 1. Get an ExecutionEnvironment
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
             //设定事件时间处理模式
              env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      
              Properties properties = new Properties();
              properties.setProperty("zookeeper.connect", "localhost:2181");
              properties.setProperty("bootstrap.servers", "localhost:9092");
              properties.setProperty("group.id", "test");
              properties.setProperty("auto.offset.reset", "none");
      
              // 2. Get the source
              FlinkKafkaConsumerBase<MovieRate> myConsumer = new FlinkKafkaConsumer011<MovieRate>(
                      "test2",
                      new MovieRateSchema(),
                      properties)
                      //分配时间戳
                      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MovieRate>() {
                          @Override
                          public long extractAscendingTimestamp(MovieRate movieRate) {
                              return movieRate.timeStamp;
                          }
                      });
      
              DataStream<MovieRate> rates = env
                      .addSource(myConsumer);
      
              DataStream<Tuple2<Integer,Double>>  tupleRate = rates
                      .keyBy((MovieRate rate)->rate.userId)
                      // 设定事件时间窗口为2毫秒
                      .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
                      .apply(new WindowFunction<MovieRate, Tuple2<Integer, Double>, Integer, TimeWindow>() {
                                 @Override
                                 public void apply(Integer integer, TimeWindow timeWindow, Iterable<MovieRate> iterable, Collector<Tuple2<Integer, Double>> collector) throws Exception {
                                     double sum = 0;
                                     int count = 0;
                                     for (MovieRate t : iterable){
                                         sum += t.rate;
                                         count += 1;
                                     }
                                     collector.collect(Tuple2.of(integer, sum/count));
                                 }
                             });
    • Processing time

      这就跟当前处理时的前来的事件有关了。相同的例子,只需改一句即可,最前面的环境设置也需要改动

      DataStream<Tuple2<Integer,Double>>  tupleRate = rates
                      .keyBy((MovieRate rate)->rate.userId)
                     // 将Event改为Processing
                      .window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
                      .apply(...);
  2. Count window

    这种window通过计数两进行分组,比如说每次有100个数据进行一个窗口封装。

  3. Session window

    在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。

更多关于windows的用法,请见另一篇博文:

2.8 Connect & Split & Select

2.8.1 connect

DataStream,DataStream ➡ ConnectedStream

“连接”两个数据流。这两个数据流可以是任意形式的,里面的元素不一定要格式一样。

我们可以这样想象,现在我们的数据流是某个用户对于某个电影在某个时间点的评分。就豆瓣而言,评分完成后还可以该电影写短评,这就又构成了一个数据流。把这两个流连接并进行匹配将会是该操作符的一个应用。

2.8.2 split & select

DataStream ➡ SplitStream ➡ DataStream

有连接就自然有分割,该操作符将一个数据流分割成若干个数据流。

下面的例子,同样的数据集,将userId大于50和小于50的分成old和young两个数据集,打印young数据集。select操作符用于选择Splitstream中特定的stream。

SplitStream<MovieRate> splitRate = rates
                .split(new OutputSelector<MovieRate>() {
                    @Override
                    public Iterable<String> select(MovieRate movieRate) {
                        // 这个list储存每个splitstream的标签
                        List<String> output = new ArrayList<String>();
                        if (movieRate.userId < 50){
                            output.add("young");
                        } else{
                            output.add("old");
                        }
                        return output;
                    }
                });
DataStream<MovieRate> young = splitRate.select("young");
young.print()

可见,输出中只包含了userId小于50的数据:

1> 30,67,2.5,10338
1> 15,172,1.0,10338
1> 37,154,0.5,10338
1> 37,172,1.0,10339
1> 31,66,1.0,10339
...

2.9 Iterate

DataStream ➡ IterativeStream ➡ DataStream

通过重定向操作符,建立一个回馈循环。在建立迭代算法的时候想必是很有用的。

使用上一节产生的young流,每次迭代将每个record的分数减半,直到分数不大于1为止,并返回不大于1时最后的结果。

IterativeStream<MovieRate> iteration = young.iterate();
// 每次迭代的操作
DataStream<MovieRate> iterationBody = iteration.map(new MapFunction<MovieRate, MovieRate>() {
    @Override
    public MovieRate map(MovieRate movieRate) throws Exception {
        movieRate.rate = movieRate.rate / 2;
        return movieRate;
    }
});
// 迭代结束的标志
DataStream<MovieRate> feedback = iterationBody.filter(new FilterFunction<MovieRate>() {
    @Override
    public boolean filter(MovieRate movieRate) throws Exception {
        return movieRate.rate > 1;
    }
});
// 建立迭代
iteration.closeWith(feedback);
// 由于iterationBody包含所有迭代中间操作的数据,这里需要迭代之后的过滤
DataStream<MovieRate> output = iterationBody.filter(new FilterFunction<MovieRate>() {
    @Override
    public boolean filter(MovieRate movieRate) throws Exception {
        return movieRate.rate <= 1;
    }
});

output.print();

可以看到,输出的records分数都小于1:

...
1> 12,132,0.625,10500
1> 48,13,0.875,10500
1> 22,128,0.75,10500
1> 9,53,0.5,10501
1> 3,58,0.5,10501
...

2.10 Extract Timestamps

DataStream ➡ DataStream

提取时间戳,也可以说是分配时间戳。这个在2.7节的代码中也有提到,当要使用事件时间作为窗口依据时,在原数据中通常都会有一列代表着时间戳,这个时候需要在Datastream流中声明该列为时间戳列。嗯,就是这样。

stream.assignTimestamps(new TimeStampExtractor(){...});

2.11 Project

DataStream ➡ DataStream

在tuple流中选取指定的列作为新的流。

DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

Reference:

  1. Flink 原理与实现:Window 机制
  2. Flink官方网站
01-22 14:38