Flink 的流数据 API 编程指南

Flink 的流数据处理程序是常规的程序 ,通过再流数据上,实现了各种转换 (比如 过滤, 更新中间状态, 定义窗口, 聚合)。流数据可以来之多种数据源 (比如, 消息队列, socket 流, 文件). 通过sink组件落地流计算的最终结果,比如可以把数据落地文件系统,标准输出流比如命令行界面, Flink 的程序可以运行在多种上下文环境 ,可以单独只是Flink api,也可以嵌入其他程序. execution可以运行在本地的 JVM里, 也可以 运行在多台机器的集群中。为了创建你的流数据处理程序,,我们建议您从程序骨架开始,然后逐步添加您自己的transformations。后面的章节,是一些附加的操作,和一些高级功能。

例子程序

下面是一个可运行的完整的例子  ,带窗口的流数据wordcount程序, 数据源来自一个每5秒一次的socket. 你可以复制黏贴并本地运行.

 public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.of(5, TimeUnit.SECONDS))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

为了运行这个程序, 启动netcat在terminal中,敲下面这段:

nc -lk 9999

随便敲几个字. 这几个字将作为例子程序的输入. 如果你在5秒内,按了重复的字符,他们的count将会超过1. (如果你敲得不够快,可以提高5秒这个设置☺).

Back to top

连接Flink

为了可以写Flink的代码, 你需要导入对应的语言的DataStream 的依赖包到你的工程里。

最简单的做法是使用quickstart 脚本: either for Java or for Scala. 你可以从模板中创建一个空工程 (a Maven Archetype), 这个工程已经准备好了一切编程所需的一切了.通过敲下面的代码,你可使用archetype 来手工的创建一个工程:

mvn archetype:generate /     -DarchetypeGroupId=org.apache.flink/     -DarchetypeArtifactId=flink-quickstart-java /     -DarchetypeVersion=1.0-SNAPSHOT
mvn archetype:generate /     -DarchetypeGroupId=org.apache.flink/     -DarchetypeArtifactId=flink-quickstart-scala /     -DarchetypeVersion=1.0-SNAPSHOT

这个archetypes依赖,稳定版本或者当前的版本(-SNAPSHOT).

如果你想为一个存在的maven工程添加Flink,在你的pom文件里添加下面这段依赖。

<dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-streaming-java</artifactId>   <version>1.0-SNAPSHOT</version> </dependency> <dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-clients</artifactId>   <version>1.0-SNAPSHOT</version> </dependency>
<dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-streaming-scala</artifactId>   <version>1.0-SNAPSHOT</version> </dependency> <dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-clients</artifactId>   <version>1.0-SNAPSHOT</version> </dependency>

程序骨架

就像例子程序里看到的, Flink 的流数据编程就像大多数java程序一样的,是一个带main函数一样的java程序 . 每个程序由相同的基本部分组成:

  1. 获取一个流数据的 ExecutionEnvironment,
  2. 链接流数据的数据源,
  3. 定义流数据上的transformation,
  4. 定义处理完的数据的输出,
  5. 开始执行.

我们现在会对每一个步骤一个概述,,请参阅有各个部分的关详细信息的。

StreamExecutionEnvironment 是所有 Flink 流数据程序的基础. 你可以通过treamExecutionEnvironment这个类的任意一个静态方法获取 :

getExecutionEnvironment()  createLocalEnvironment() createLocalEnvironment(int parallelism) createLocalEnvironment(int parallelism, Configuration customConfiguration)  createRemoteEnvironment(String host, int port, String... jarFiles) createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)

一般来说,你只需要使用 getExecutionEnvironment(), 因为这个方法会根据环境的上下文获取正确的对象: 如果你一般java程序一样,在IDE里执行你的程序,它会创建一个本地environment ,用来在本地机器上执行你的程序。如果你的程序打包成jar, 然后通过命令行或者web界面调用这个jar, Flink 的cluster manager 会执行你的主函数,此时getExecutionEnvironment()会返回一个集群环境的execution来执行你的程序。

environment 有多个方法,可以用来定义不同的数据源 。包括文件系统, sockets, and 和外部系统. 你可以调用下面的代码来获取socket里的数据源,用来调试用:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  DataStream<String> lines = env.socketTextStream("localhost", )

这行代码会返回一个DataStream ,你可以在这个对象使用transformations. 更多的数据源相关的,可以阅读数据源那一章。

一旦获取了DataStream 对象,你可以调用transformations ,来创建一个新的DataStream。 然后你还可以写回socket, 或者继续调用transform , 或者和其他的DataStreams结合, 或者 把数据落地到其他外部系统(比如, 消息队列或者文件系统). 你可以通过调用DataStream的方法来调用各种不同的transformation。 并嵌入你自定义的函数到transformation中。举个例子, map transformation大概是像是下面这样的:

DataStream<String> input = ...;  DataStream<Integer> intValues = input.map(new MapFunction<String, Integer>() {     @Override     public Integer map(String value) {         return Integer.parseInt(value);     } });

这段代码会产生一个新的DataStream ,并将原始流上的String类型的数据转换成Integer类型。更多的详细内容可以看transformation这一章。

一旦你有的一个含有你最终计算结果的DataStream, 你可以把结果落地到外部系统 (比如HDFS, Kafka, Elasticsearch), 或者写回socket, 或者写入到文件系统, 或者打印出来.

writeAsText(String path, ...) writeAsCsv(String path, ...) writeToSocket(String hostname, int port, ...)  print()  addSink(...)

一旦你编写好转换和落地等操作,你需要通过调用execute() 来触发程序开始执行,具体的执行方式依赖具体的StreamExecutionEnvironment. 这个方法会再本地机器上执行,也可能在集群上提交这个程序。

env.execute();

下面除了例子是scala编写的其他和上面一样

As presented in the example, Flink DataStream programs look like regular Scala programs with a main() method. Each program consists of the same basic parts:

  1. Obtaining a StreamExecutionEnvironment,
  2. Connecting to data stream sources,
  3. Specifying transformations on the data streams,
  4. Specifying output for the processed data,
  5. Executing the program.

We will now give an overview of each of those steps, please refer to the respective sections for more details.

The StreamExecutionEnvironment is the basis for all Flink DataStream programs. You can obtain one using these static methods on classStreamExecutionEnvironment:

def getExecutionEnvironment  def createLocalEnvironment(parallelism: Int =  Runtime.getRuntime.availableProcessors())  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*) def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*)

Typically, you only need to use getExecutionEnvironment, since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the command line or the web interface, the Flink cluster manager will execute your main method and getExecutionEnvironment() will return an execution environment for executing your program on a cluster.

For specifying data sources the execution environment has several methods to read from files, sockets, and external systems using various methods. To just read data from a socket (useful also for debugging), you can use:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment  DataStream<String> lines = env.socketTextStream("localhost", )

This will give you a DataStream on which you can then apply transformations. For more information on data sources and input formats, please refer to Data Sources.

Once you have a DataStream you can apply transformations to create a new DataStream which you can then write to a file, transform again, combine with other DataStreams, or push to an external system. You apply transformations by calling methods on DataStream with your own custom transformation function. For example, a map transformation looks like this:

val input: DataStream[String] = ...  val mapped = input.map { x => x.toInt }

This will create a new DataStream by converting every String in the original set to an Integer. For more information and a list of all the transformations, please refer to Transformations.

Once you have a DataStream containing your final results, you can push the result to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, or print it.

writeAsText(path: String, ...) writeAsCsv(path: String, ...) writeToSocket(hostname: String, port: Int, ...)  print()  addSink(...)

Once you specified the complete program you need to trigger the program execution by calling execute on StreamExecutionEnvironment. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.

env.execute()

Back to top

流数据抽象

流数据是一种相同类型的无限的数据集合。

Transformations会返回不同子类类型的of DataStream,并且转换后的还可以继续transformations,比如 keyBy(…) 方法会返回KeyedDataStream ,这个也是流数据, 通过一个 key在本地做分区的流数据。还可以进行窗口操作。

Back to top

延迟执行

Flink的所有流数据程序是延迟执的。当main函数执行后, 数据加载和转换不是立刻执行的,相反的,每一步操作会加入一个执行计划.。直到evn执行execute方法来启动程序,这个执行计划才会执行。不论是本地执行还是在集群上执行.

延迟执行让你可以构建复杂的程序,并且让flink执行起来,像是个完整的计划单元。

Transformations

数据transformation让流数据产生新的流数据,. 程序可以结合多个流数据来构建复杂的应用拓扑结构。

这章给出了所有可用的transformations的详细说明。

Map
DataStream → DataStream

取一个元素并产生一个元素(一进对一出的意思)。下面的例子是一个map函数,该函数将输入流的值加倍:

DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
FlatMap
DataStream → DataStream

需要一个元素,并产生零个,一个或多个元素(void返回值,对返回无要求,依赖out如何发送)。下面的例子是一个flatmap功能拆分句子的话:

dataStream.flatMap(new FlatMapFunction<String, String>() {     @Override     public void flatMap(String value, Collector<String> out)         throws Exception {         for(String word: value.split(" ")){             out.collect(word);         }     } });
Filter
DataStream → DataStream

对每个元素执行boolean函数,过滤掉false的值。下面的例子是滤出零值的过滤器:

dataStream.filter(new FilterFunction<Integer>() {     @Override     public boolean filter(Integer value) throws Exception {         return value != ;     } });
KeyBy
DataStream → KeyedStream

从逻辑上将一个流数据划分成不相交的分区,每个分区包含相同的键的元素。在内部是用通过哈希分区来实现的。查看如何指定键的键。这一转变返回keyeddatastream。下面例子展示如何定义分区。

dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy() // Key by the first element of a Tuple
Reduce
KeyedStream → DataStream

这是一个keyeddatastream特有的滚动的reduce功能, 多对一:对所有同key的元素进行传入的运算,将总的结果发送出去。

下面的reduce 函数,得到了部分流数据的和:

keyedStream.reduce(new ReduceFunction<Integer>() {     @Override     public Integer reduce(Integer value1, Integer value2)     throws Exception {         return value1 + value2;     } });
Fold
DataStream→ DataStream

有一个初始值(0),其他和上面一样。这是一个keyeddatastream特有的滚动的reduce功能, 多对一:对所有同key的元素进行传入的运算,将总的结果发送出去。

下面的reduce 函数,得到了部分流数据的和:

keyedStream.fold(, new ReduceFunction<Integer>() {   @Override   public Integer fold(Integer accumulator, Integer value)   throws Exception {       return accumulator + value;   } });
Aggregations
KeyedStream → DataStream

这是一个keyeddatastream特有的滚动的聚合功能. min 和minBy 的区别是min 返回最小值, minBy 返回有指定key的最小值,对应的元素。

keyedStream.sum(); keyedStream.sum("key"); keyedStream.min(); keyedStream.min("key"); keyedStream.max(); keyedStream.max("key"); keyedStream.minBy(); keyedStream.minBy("key"); keyedStream.maxBy(); keyedStream.maxBy("key");
Window
KeyedStream → WindowedStream

可以对分区完KeyedStreams进行分区.  Windows根据每个key对应的数据的某些特征进行分组 (比如:每五秒到达的数据根据key划分为一个组). 后面有一章专门详细介绍windows

dataStream.keyBy().window(TumblingTimeWindows.of(, TimeUnit.SECONDS)); // Last 5 seconds of data
WindowAll
DataStream → AllWindowedStream

Windows也能在一般的DataStream上使用而不仅仅是对KeyedStream 。Windows能对所有的stream event 进行分组(比如:对最近的5秒的数据进行分组).

警告: 这是在许多情况下,一个非平行变换。所有的记录都会聚集在一个任务的windowall算子。

dataStream.windowAll(TumblingTimeWindows.of(Time.of(, TimeUnit.SECONDS))); // Last 5 seconds of data
(Window) Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

对windowStream的每一个小窗口应用一个函数.。下面的例子是对每个window的数据做sum的操作.

注意: 如果你使用的是上面的 那个windowAll 的transformation, 你需要传递AllWindowFunction ,而不是windowFunction。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>,Integer>, Tuple, Window>() {     public void apply (Tuple tuple,             Window window,             Iterable<Tuple2<String, Integer>> values,             Collector<Integer> out) throws Exception {         int sum = ;         for (value t: values) {             sum += t.f1;         }         out.collect (new Integer(sum));     } };
(Window) Reduce
WindowedStream → DataStream

对一个window里的数据做reduce,并返回reduce的结果。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() {     public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {         return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);     } };
(Window) Fold
WindowedStream → DataStream

对一个window里的数据做fold,并返回fold的结果。

windowedStream.fold (new Tuple2<String,Integer>("Sum of all", ),  new FoldFunction<Tuple2<String,Integer>() {     public Tuple2<String, Integer> fold(Tuple2<String, Integer> acc, Tuple2<String, Integer> value) throws Exception {         return new Tuple2<String,Integer>(acc.f0, acc.f1 + value.f1);     } };
windows上的聚合
WindowedStream → DataStream

聚合window内的内容.。 min 和minBy 的区别是min 返回最小值, minBy 返回有指定key的最小值,对应的元素。

windowedStream.sum(); windowedStream.sum("key"); windowedStream.min(); windowedStream.min("key"); windowedStream.max(); windowedStream.max("key"); windowedStream.minBy(); windowedStream.minBy("key"); windowedStream.maxBy(); windowedStream.maxBy("key");
Union
DataStream* → DataStream

连接两个或者多个datastream,并创建一个包含这几个dataStream里的所有元素的新的dataStream。

注意: 如果你union同一个datastream,还是只能获取其中一个。

dataStream.union(otherStream1, otherStream2, ...);
Window Join
DataStream,DataStream → DataStream

在一个window内,根据给定的key的条件是否满足,来join两个流。

dataStream.join(otherStream)     .where().equalTo()     .window(TumblingTimeWindows.of(Time.of(, TimeUnit.SECONDS)))     .apply (new JoinFunction () {...});
Window CoGroup
DataStream,DataStream → DataStream

在一个window内,根据给定的key的条件是否满足,对两个流合并后,并分组。

dataStream.coGroup(otherStream)     .where().equalTo()     .window(TumblingTimeWindows.of(Time.of(, TimeUnit.SECONDS)))     .apply (new CoGroupFunction () {...});
Connect
DataStream,DataStream → ConnectedStreams

连接两个流,并保留同样的流类型. 连接后的两个流之间可以共享 state。

DataStream<Integer> someStream = //... DataStream<String> otherStream = //...  ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
CoMap, CoFlatMap
ConnectedStreams → DataStream

map 和 flatMap在连接后的流中的效果是类似的。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {     @Override     public Boolean map1(Integer value) {         return true;     }      @Override     public Boolean map2(String value) {         return false;     } }); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {     @Override    public void flatMap1(Integer value, Collector<String> out) {        out.collect(value.toString());    }     @Override    public void flatMap2(String value, Collector<String> out) {        for (String word: value.split(" ")) {          out.collect(word);        }    } });
Split
DataStream → SplitStream

根据某些标准,将一个流分离成两个或者多个流。

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {     @Override     public Iterable<String> select(Integer value) {         List<String> output = new ArrayList<String>();         if (value %  == ) {             output.add("even");         }         else {             output.add("odd");         }         return output;     } });
Select
SplitStream → DataStream

从分离后的流中,选出一个或者多个流。

SplitStream<Integer> split; DataStream<Integer> even = split.select("even"); DataStream<Integer> odd = split.select("odd"); DataStream<Integer> all = split.select("even","odd");
Iterate
DataStream → IterativeStream → DataStream

在流中创建一个“回调”循环:将输出重定向到之前的一个operater。这对于需要持续更新一个模型的算法来说是特别有用的。接下来的一段代码就从在一个流中使用了迭代来不断更新。大于0的元素被重定向到回调channel,剩下的元素继续往下流。参见iterations来获取更多详情

IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){     @Override     public boolean filter(Integer value) throws Exception {         return value > ;     } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){     @Override     public boolean filter(Integer value) throws Exception {         return value <= ;     } });
Extract Timestamps
DataStream → DataStream

可以抽出时间语义窗口里面的记录的时间戳,详情请见working with time

stream.assignTimestamps (new TimeStampExtractor() {...});
Map
DataStream → DataStream

Takes one element and produces one element. A map function that doubles the values of the input stream:

dataStream.map { x => x *  }
FlatMap
DataStream → DataStream

Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap { str => str.split(" ") }
Filter
DataStream → DataStream

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter { _ !=  }
KeyBy
DataStream → KeyedStream

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream.

dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy() // Key by the first element of a Tuple
Reduce
KeyedStream → DataStream

A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums:

keyedStream.reduce { _ + _ }
Fold
DataStream → DataStream

A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that creates a stream of partial sums:

keyedStream.fold { , _ + _ }
Aggregations
KeyedStream → DataStream

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

keyedStream.sum() keyedStream.sum("key") keyedStream.min() keyedStream.min("key") keyedStream.max() keyedStream.max("key") keyedStream.minBy() keyedStream.minBy("key") keyedStream.maxBy() keyedStream.maxBy("key")
Window
KeyedStream → WindowedStream

Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows.

dataStream.keyBy().window(TumblingTimeWindows.of(, TimeUnit.SECONDS)) // Last 5 seconds of data // Last 5 seconds of data
WindowAll
DataStream → AllWindowedStream

Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.

WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.

dataStream.windowAll(TumblingTimeWindows.of(Time.of(, TimeUnit.SECONDS))) // Last 5 seconds of data
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

windowedStream.apply { applyFunction }
Window Reduce
WindowedStream → DataStream

Applies a functional reduce function to the window and returns the reduced value.

windowedStream.reduce { _ + _ }
Window Fold
WindowedStream → DataStream

Applies a functional fold function to the window and returns the folded value.

windowedStream.fold { , _ + _ }
Aggregations on windows
WindowedStream → DataStream

Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

windowedStream.sum() windowedStream.sum("key") windowedStream.min() windowedStream.min("key") windowedStream.max() windowedStream.max("key") windowedStream.minBy() windowedStream.minBy("key") windowedStream.maxBy() windowedStream.maxBy("key")
Union
DataStream* → DataStream

Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will still only get each element once.

dataStream.union(otherStream1, otherStream2, ...)
Window Join
DataStream,DataStream → DataStream

Join two data streams on a given key and a common window.

dataStream.join(otherStream)     .where().equalTo()     .window(TumblingTimeWindows.of(Time.of(, TimeUnit.SECONDS)))     .apply { ... }
Window CoGroup
DataStream,DataStream → DataStream

Cogroups two data streams on a given key and a common window.

dataStream.coGroup(otherStream)     .where().equalTo()     .window(TumblingTimeWindows.of(Time.of(, TimeUnit.SECONDS)))     .apply {}
Connect
DataStream,DataStream → ConnectedStreams

"Connects" two data streams retaining their types, allowing for shared state between the two streams.

someStream : DataStream[Int] = ... otherStream : DataStream[String] = ...  val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap
ConnectedStreams → DataStream

Similar to map and flatMap on a connected data stream

connectedStreams.map(     (_ : Int) => true,     (_ : String) => false ) connectedStreams.flatMap(     (_ : Int) => true,     (_ : String) => false )
Split
DataStream → SplitStream

Split the stream into two or more streams according to some criterion.

val split = someDataStream.split(   (num: Int) =>     (num % ) match {       case  => List("even")       case  => List("odd")     } )
Select
SplitStream → DataStream

Select one or more streams from a split stream.

val even = split select "even" val odd = split select "odd" val all = split.select("even","odd")
Iterate
DataStream → IterativeStream → DataStream

Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.

initialStream. iterate {   iteration => {     val iterationBody = iteration.map {/*do something*/}     (iterationBody.filter(_ > ), iterationBody.filter(_ <= ))   } } IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter ( _ > ); iteration.closeWith(feedback);
Extract Timestamps
DataStream → DataStream

Extracts timestamps from records in order to work with windows that use event time semantics. See working with time.

stream.assignTimestamps { timestampExtractor }

The following transformations are available on data streams of Tuples:

Project
DataStream → DataStream

Selects a subset of fields from the tuples

DataStream<Tuple3<Integer, Double, String>> in = // [...] DataStream<Tuple2<String, Integer>> out = in.project(,);
Project
DataStream → DataStream

Selects a subset of fields from the tuples

val in : DataStream[(Int,Double,String)] = // [...] val out = in.project(,)

物理分区

Flink 也通过以下Functions对transformation后的流进行底层的控制(如果需要的话)

Hash partitioning
DataStream → DataStream

对keyBy后相同key的流返回DataStream,而不是KeyedStream

dataStream.partitionByHash("someKey"); dataStream.partitionByHash();
Custom partitioning
DataStream → DataStream

通过使用用户自定义的分区规则来给每个元素选择目标task

dataStream.partitionCustom(new Partitioner(){...}, "someKey"); dataStream.partitionCustom(new Partitioner(){...}, );
Random partitioning
DataStream → DataStream

随机均匀将将元素分区

dataStream.partitionRandom();
Rebalancing (Round-robin partitioning)
DataStream → DataStream

循环对元素分区,对每个分区进行负载均衡,用于优化处理数据倾斜的情况

dataStream.rebalance();
Broadcasting
DataStream → DataStream

将每个元素以广播形式发送到每一个分区

dataStream.broadcast();
 
Hash partitioning
DataStream → DataStream

Identical to keyBy but returns a DataStream instead of a KeyedStream.

dataStream.partitionByHash("someKey") dataStream.partitionByHash()
Custom partitioning
DataStream → DataStream

Uses a user-defined Partitioner to select the target task for each element.

dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, )
Random partitioning
DataStream → DataStream

Partitions elements randomly according to a uniform distribution.

dataStream.partitionRandom()
Rebalancing (Round-robin partitioning)
DataStream → DataStream

Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.

dataStream.rebalance()
Broadcasting
DataStream → DataStream

Broadcasts elements to every partition.

dataStream.broadcast()

Task chaining and resource groups

Task chain 和资源组

chain两个接下来的transformations 的意思是将他们放在同一个线程里面执行,这会工作得更好。默认情况下,flink尽可能地将operator进行chain(例如:接下来的2个map transformation)。API提供了对chain的细粒度控制:

如果你想在整个job中禁用chain,那你可以使用StreamExecutionEnvironment.disableOperatorChaining() 。接下来的functions会提供更细粒度的控制。注意,这些functions只能用在transformation之后的DataStream中。比如,你可以这样:someStream.map(...).startNewChain(),但是不能这样:someStream.startNewChain()。

一个资源组就是Flink里的一个slot,参考slots,根据需要,你可以在对每一个slot进行手动隔离操作

Start new chain

使用如下操作开始新的chain,以下两个map将会被chain,但filter不会和第一个map进行chain操作

someStream.filter(...).map(...).startNewChain().map(...);
Disable chaining

禁用对这个map操作的chain操作

someStream.map(...).disableChaining();
Start a new resource group

启动一个新的包含这个map和后续一些操作的资源组

someStream.filter(...).startNewResourceGroup();
Isolate resources

对slot进行分离操作

someStream.map(...).isolateResources();
Start new chain

Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.

someStream.filter(...).map(...).startNewChain().map(...)
Disable chaining

Do not chain the map operator

someStream.map(...).disableChaining()
Start a new resource group

Start a new resource group containing the map and the subsequent operators.

someStream.filter(...).startNewResourceGroup()
Isolate resources

Isolate the operator in its own slot.

someStream.map(...).isolateResources()

Back to top

Specifying Keys

keyBy transformation 需要指定的键已经在DataStream上定义。

使用方式如下

DataStream<...> input = // [...] DataStream<...> windowed = input 	.keyBy(/*define key here*/) 	.window(/*define window here*/);

Flink的数据模型不是基于键值对的。因此,你不需要将dataStream转换为键值类型。“键”是虚拟的:他们就像function一样定义在数据流之上用来指导分组操作

参见 the relevant section of the DataSet API documentation来了解怎样去指定键。仅仅只需要将DataSet替换为DataStream和groupBy withkeyBy.

Passing Functions to Flink

一些transformations以用户自定义的functions为参数

See the relevant section of the DataSet API documentation.

Back to top

Data Types

Flink places some restrictions on the type of elements that are used in DataStreams and in results of transformations. The reason for this is that the system analyzes the types to determine efficient execution strategies.

See the relevant section of the DataSet API documentation.

Back to top

Data Sources

数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction). 来创建,你可以直接使用Flink自带的functions来创建,同时也能够编写实现了 SourceFuntion 、theParallelSourceFunction 接口的类来定义自己的non-parallel的数据源,或编写继承RichParallelSourceFunction的类来定义parallel的自己的数据源。

在StreamExecutionEnvironment中已经有几个定义好的数据源可用了:

基于文件的:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

  • readFile(path) / Any input format - Reads files as dictated by the input format.

  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer.

  • readFileStream - create a stream by appending elements when there are changes to a file

基于Socket的:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

基于集合的:

  • fromCollection(Collection) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.

  • fromCollection(Iterator, Class) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.

  • fromElements(T ...) - Creates a data stream from the given sequence of objects. All objects must be of the same type.

  • fromParallelCollection(SplittableIterator, Class) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

自定义:

  • addSource - Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...)). See connectors for more details.

Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction). You can either use one of the source functions that come with Flink or write a custom source by implementing the SourceFunction for non-parallel sources, or by implementing theParallelSourceFunction interface or extending RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

File-based:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

  • readFile(path) / Any input format - Reads files as dictated by the input format.

  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer.

  • readFileStream - create a stream by appending elements when there are changes to a file

Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

Collection-based:

  • fromCollection(Seq) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.

  • fromCollection(Iterator) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.

  • fromElements(elements: _*) - Creates a data stream from the given sequence of objects. All objects must be of the same type.

  • fromParallelCollection(SplittableIterator) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

Custom:

  • addSource - Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...)). See connectors for more details.

Back to top

Execution Configuration

StreamExecutionEnvironment提供ExecutionConfig来设置job的运行时配置

See the relevant section of the DataSet API documentation.

以下是对应API:

  • enableTimestamps() / disableTimestamps(): 给从源头发出来的event加上一个时间戳. areTimestampsEnabled().返回现在的时间

  • setAutoWatermarkInterval(long milliseconds): Set the interval for automatic watermark emission.设置Watermarks 发送的时间间隔。可以通过getAutoWatermarkInterval()来获的目前的间隔。

Back to top

Data Sinks

Flink可以将数据输出到文件、socket,外部系统,或者打印出来.Flink自带多种内置的输出形式:

  • writeAsText() / TextOuputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.

  • writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.

  • print() / printToErr() - Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.

  • write() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOuputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.

  • writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.

  • print() / printToErr() - Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.

  • write() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.

Back to top

Debugging

在将一个流式程序提交到集群中运行之前,确保你实现的算法能够按照你的预想来运行会更好。因此,执行数据分析程序通常是检查结果、调试和改进的增量过程。

Flink提供在本地IDE调试、注入测试数据、结果数据收集等功能,明显改善了数据分析程序的部署。这些部分都在怎样改善程序的部署提供了提示。

本地运行环境

LocalStreamEnvironment在与JVM同一个进程中启动Flink系统。如果你是从IDE中启动LocalEnvironement,你可以在程序中设置断点来提高调试效率

以下为例子:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();  DataStream<String> lines = env.addSource(/* some source */); // build your program  env.execute();
val env = StreamExecutionEnvironment.createLocalEnvironment()  val lines = env.addSource(/* some source */) // build your program  env.execute()

数据源集合

Flink提供一些得到java集合的特殊数据源来进行测试。一旦一个程序呗测试,这些source和sink很容易就被从外部系统读入/往外部系统输出的 source或sink替换了。

数据源集合使用范例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();  // Create a DataStream from a list of elements DataStream<Integer> myInts = env.fromElements(, , , , );  // Create a DataStream from any Java collection List<Tuple2<String, Integer>> data = ... DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);  // Create a DataStream from an Iterator Iterator<Long> longIt = ... DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
val env = StreamExecutionEnvironment.createLocalEnvironment()  // Create a DataStream from a list of elements val myInts = env.fromElements(, , , , )  // Create a DataStream from any Collection val data: Seq[(String, Int)] = ... val myTuples = env.fromCollection(data)  // Create a DataStream from an Iterator val longIt: Iterator[Long] = ... val myLongs = env.fromCollection(longIt)

注意:目前,数据源集合要求数据类型和迭代器都要实现Serializable接口。此外,数据源集合也不能并行地执行。

Back to top

窗口

时间窗口

时间窗口,通常是指一定时间内的一组event的组合。  时间窗口函数定义了时间的类型,目前支持三种不同的时间窗口:

  • Processing time: Processing time是指当transformation发生时候的机器的时间, Processing time 是最简单的时间类型,也是性能最高的。. 但是,在分布式和异步环境下,机器时间,往往不一致和有很多不确定性。

  • Event time: Event time是指每个event发生的时间 。 这份时间一般是当消息进入flink前,消息本身自带的。或者从消息的某个字段中抽取出来. 当使用event time的情况下,乱序的消息可以被适当的处理。. 举个例子, 在12分的时间窗口里,当一个10分钟的event在12分钟的时候到达了,transformation也会正确的处理这些乱了序的event。. Event time 的处理方式提供了可预测的结果。 , 但会带来更多的延迟, 因为乱序的消息需要被缓存起来到内存里。

  • Ingestion time: Ingestion(食入,摄取) time 是当event进入到flink的时间。.当event消息进入到flink被分配到的task所在的机器上的时间,作为分配给event的时间,. Ingestion time比   processing time更有确定性和可预测性, 比event time有更低的延迟。因为不依赖外部系统。因此, Ingestion time 提供了一种处于两者之间的解决方案。 Ingestion time 其实可以说是 event time的一种特殊情况,实际上,ingestion time 和eventtime在flink的底层中的处理方式是一样的。

当使用 event time时, transformations需要避免无限的等待event到达,Watermarks 提供了一种控制event time的偏移时间的机制。Watermarks是由 sources发射出来的. 一个watermark 带有一个确定的时间戳(long),比如转换后是2015-12-03 14:17:30 ,则表示,不会再有比这个时间更早的时间的消息会到达。

你可以通过下面的方式,选择你需要的时间语义。

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

默认情况是TimeCharacteristic.ProcessingTime, 写一个processing time的语义的程序是不需要,再做其他事情。

如果要写一个event time语义的程序 , 需要做下面4个步骤:

  • 1:设置event time的语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  • 2:使用DataStream.assignTimestamps(...) 告诉flink,时间戳和event的关联。比如说,第几个字段是时间戳。

  • 设置 让时间戳有效,enableTimestamps(), 还有watermark的发射间隔。(setAutoWatermarkInterval(long milliseconds)) inExecutionConfig.

举个例子, 假设   我们有一个tuple的数据流, 并且里面的第一个字段是时间戳 (产生这些消息的系统赋予的,非flink),  并且我们知道处理时间和这个时间的落差不会超过1秒。

DataStream<Tuple4<Long,Integer,Double,String>> stream = //... stream.assignTimestamps(new TimestampExtractor<Tuple4<Long,Integer,Double,String>>{     @Override     public long extractTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) {         return element.f0;     }      @Override     public long extractWatermark(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) {         return element.f0 - ;     }      @Override     public long getCurrentWatermark() {         return Long.MIN_VALUE;     } });
val stream: DataStream[(Long,Int,Double,String)] = null; stream.assignTimestampts(new TimestampExtractor[(Long, Int, Double, String)] {   override def extractTimestamp(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1    override def extractWatermark(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 -     override def getCurrentWatermark: Long = Long.MinValue })

如果你确定,你的时间戳一定是升序的,按顺序到达,你可以使用 AscendingTimestampExtractor, 系统会自动的发射watermark:

DataStream<Tuple4<Long,Integer,Double,String>> stream = //... stream.assignTimestamps(new AscendingTimestampExtractor<Tuple4<Long,Integer,Double,String>>{     @Override     public long extractAscendingTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) {         return element.f0;     } });
stream.extractAscendingTimestamp(record => record._1)

使用 ingestion time 语义,你需要:

1:env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime).

你可以想象一下,这个设置,其实是是event time的简写。因为source根据当前的机器时间,flink注入和发射都是在flink做的,所以flink可以推断后面的那些参数,所以自动做的。

Windows on Keyed Data Streams

Flink提供了好多方法,为KeyedStream定义windowon . 每个window 包含了同样的key的元素 。

Basic Window Constructs

Flink提供具有灵活性的通用窗口和普通情况下能通用的一些数据。在你需要自定义时间窗口之前,先看看这些预定义的能不能使用吧。

跳动时间 window
KeyedStream → WindowedStream

定义一个5秒跳动的窗口. 表示根据元素的时间戳,5秒为一个单位组织起来的窗口, 并且每个元素只会在一个窗口中出现一次。时间戳根据上面的env设置的语义而定。

keyedStream.timeWindow(Time.of(, TimeUnit.SECONDS));
滑动时间window
KeyedStream → WindowedStream

定义一个5秒的窗口, 1秒滑动一次. 表示根据元素的时间戳,5秒为一个单位组织起来的窗口 ,但是每个元素可能在多个窗口中出现多次。

keyedStream.timeWindow(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS));
跳动个数 window
KeyedStream → WindowedStream

1000个一个单位的窗口,一个元素只会出现一次

keyedStream.countWindow();
滑动个数 window
KeyedStream → WindowedStream

1000个一个单位的窗口,一个元素可能出现多次

keyedStream.countWindow(, )
Tumbling time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time).

keyedStream.timeWindow(Time.of(, TimeUnit.SECONDS))
Sliding time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) The notion of time is specified by the selected TimeCharacteristic (see time).

keyedStream.timeWindow(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS))
Tumbling count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.

keyedStream.countWindow()
Sliding count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at most 900 elements).

keyedStream.countWindow(, )

高级Window构造方式

这个机制可以定义出功能更丰富的窗口,相反的需要写更多的代码。 举个例子,下面是一个自定义的窗口,每个window持有最新的5秒并且每1秒滑动一次。 但是,当100个元素被添加到window后,window的execution函数,会被跟踪(触发)。之后每一次execution执行都会被跟踪(触发)。 window会保留10个元素:

keyedStream     .window(SlidingTimeWindows.of(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS))     .trigger(Count.of())     .evictor(Count.of());
keyedStream     .window(SlidingTimeWindows.of(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS))     .trigger(Count.of())     .evictor(Count.of())

构造一个自定义窗口的一般方式是,

(1)指定一个WindowAssigner,

(2)指定 一个触发器Trigger (optionally),

(3)指定一个逐出器Evictor (optionally).

WindowAssigner定义了如何组织一个窗口 (时间或者个数) 一个window 元素的逻辑组合,有一个begin value,和一个end value。相应的,有一个begin time和end time. 带有时间戳的元素 。

举个例子,滑动时间窗口分配器,定义了5秒为一个单位,每1秒滑动一次,假设,以毫秒为单位,时间从0毫秒开始,然后我们有6个窗口: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. 每个进来的元素,根据他们的时间戳,被分配到这6个窗口中,可能出现在多个窗口里,比如带有2000 时间戳的元素,会被分配到前三个窗口。Flink 运行,会绑定在对应的窗口分配器,可以覆盖更多的场景. 你可以自定义你的window类型,通过继承WindowAssigner类。

Global window
KeyedStream → WindowedStream

所有进来的元素,按key分组,每个组放在相同的window里。这些window没有默认的trigger,因此如果没有自定义trigger的话,这些数据是不会被trigger触发的

stream.window(GlobalWindows.create());
Tumbling time windows
KeyedStream → WindowedStream

所有进来的元素,根据元素各自的时间戳被分配到一个window里,windows之间不交叉, 每个元素最多只会出现在一个window里一次。 The window 有一个默认的 trigger. 针对event/ingestion time这两钟语义, 当收到一个高于自己的end value的watermark, window就会 触发。, 而对于 processing time 当前的processing time 超过他的current end value.

stream.window(TumblingTimeWindows.of(Time.of(, TimeUnit.SECONDS)));
Sliding time windows
KeyedStream → WindowedStream

所有进来的元素,根据元素各自的时间戳被分配到一个window里,windows之间可能会产生交叉, 。The window 有一个默认的 trigger. 针对event/ingestion time这两钟语义, 当收到一个高于自己的end value的watermark, window就会 触发,而对于 processing time 触发条件则是当前的processing time 超过他的current end value.

stream.window(SlidingTimeWindows.of(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS)));
Global window
KeyedStream → WindowedStream

All incoming elements of a given key are assigned to the same window. The window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified.

stream.window(GlobalWindows.create)
Tumbling time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (1 second below) based on their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time). The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value.

stream.window(TumblingTimeWindows.of(Time.of(, TimeUnit.SECONDS)))
Sliding time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value.

stream.window(SlidingTimeWindows.of(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS)))

Trigger定义了,跟在每一个的window后面的函数(sum,count),什么时候evaluated (“fires”)。如果没有指定trigger,就使用默认的trigger. Flink 自带了一组trigger,如果默认的trigger都没法满足你的应用,可以通过实现Trigger接口实现自己的trigger. 注意,如果使用自定义trigger后,会覆盖默认的trigger.

Processing time trigger

当前的处理时间超过他的end-value时,则发射一个window,从此之后,被跟踪的window上的元素就会被丢弃。

windowedStream.trigger(ProcessingTimeTrigger.create());
Watermark trigger

当接收到一个超过end value的watermark时,则发射一个window。被跟踪的window上的元素就会被丢弃。

windowedStream.trigger(EventTimeTrigger.create());
Continuous processing time trigger

每个being fire 的 window会定期的考虑if()。当当前时间超过他的end-value的时候,才会真正发射,被触发的窗口里的函数将会保留。

windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(, TimeUnit.SECONDS)));
Continuous watermark time trigger

每个being fire 的 window会定期的考虑if()。当watermark时间超过他的end-value的时候,才会真正发射,被触发的窗口里的函数将会保留。

windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(, TimeUnit.SECONDS)));
Count trigger

超过1000个元素后,这个窗口就会被发射,处于准备发射状态的窗口里的元素,将会被保留。

windowedStream.trigger(CountTrigger.of());
Purging trigger

Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.

windowedStream.trigger(PurgingTrigger.of(CountTrigger.of()));
Delta trigger

每个being fire 的 window会定期的考虑if()。当最后一个元素和第一个插入的元素运算后满足true的时候,才会真正发射。

windowedStream.trigger(new DeltaTrigger.of(5000.0, new DeltaFunction<Double>() {     @Override     public double getDelta (Double old, Double new) {         return (new - old > 0.01);     } }));
Processing time trigger

A window is fired when the current processing time exceeds its end-value. The elements on the triggered window are henceforth discarded.

windowedStream.trigger(ProcessingTimeTrigger.create);
Watermark trigger

A window is fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are henceforth discarded.

windowedStream.trigger(EventTimeTrigger.create);
Continuous processing time trigger

A window is periodically considered for being fired (every 5 seconds in the example). The window is actually fired only when the current processing time exceeds its end-value. The elements on the triggered window are retained.

windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(, TimeUnit.SECONDS)));
Continuous watermark time trigger

A window is periodically considered for being fired (every 5 seconds in the example). A window is actually fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are retained.

windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(, TimeUnit.SECONDS)));
Count trigger

A window is fired when it has more than a certain number of elements (1000 below). The elements of the triggered window are retained.

windowedStream.trigger(CountTrigger.of());
Purging trigger

Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.

windowedStream.trigger(PurgingTrigger.of(CountTrigger.of()));
Delta trigger

A window is periodically considered for being fired (every 5000 milliseconds in the example). A window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `DeltaFunction`.

windowedStream.trigger(DeltaTrigger.of(5000.0, { (old,new) => new - old > 0.01 }))

当trigger进行了fire之后, 并且执行sum和count之前, 有一个可选的逐出器可以移除保留元素。. Flink 自带了一组evictors ,你还可以通过实现Evictor接口,实现自定义的逐出器。.

Time evictor

从window的begin处开始移除元素,知道最后剩下 end value -1秒到 end value的元素。

triggeredStream.evict(TimeEvictor.of(Time.of(, TimeUnit.SECONDS)));
Count evictor

保留倒数的最后1000 元素,其他的丢掉。

triggeredStream.evict(CountEvictor.of());
Delta evictor

从window的begin开始,一直丢元素,知道某个元素,8比最后一个元素15小5。(通过一个阈值5 ,和一个函数).

triggeredStream.evict(DeltaEvictor.of(, new DeltaFunction<Double>() {   public double (Double oldValue, Double newValue) {       return newValue - oldValue;   } }));
Time evictor

Evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second).

triggeredStream.evict(TimeEvictor.of(Time.of(, TimeUnit.SECONDS)));
Count evictor

Retain 1000 elements from the end of the window backwards, evicting all others.

triggeredStream.evict(CountEvictor.of());
Delta evictor

Starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a DeltaFunction).

windowedStream.evict(DeltaEvictor.of(5000.0, { (old,new) => new - old > 0.01 }))

Recipes for Building Windows

window 分配器,trigger,evictor的机制都很强大。这些机制让你可以定义各种不同类型的window。Flink’s的基本window其实是在这三个机制之上包了一层的,.下面是一些通用的端口是如何通过这三种机制来构造的 。

Tumbling count window

stream.countWindow()
stream.window(GlobalWindows.create())   .trigger(CountTrigger.of()   .evict(CountEvictor.of()))
Sliding count window

stream.countWindow(, )
stream.window(GlobalWindows.create())   .trigger(CountTrigger.of()   .evict(CountEvictor.of()))
Tumbling event time window

stream.timeWindow(Time.of(, TimeUnit.SECONDS))
stream.window(TumblingTimeWindows.of((Time.of(, TimeUnit.SECONDS)))   .trigger(EventTimeTrigger.create())
Sliding event time window

stream.timeWindow(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS))
stream.window(SlidingTimeWindows.of(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS)))   .trigger(EventTimeTrigger.create())
Tumbling processing time window

stream.timeWindow(Time.of(, TimeUnit.SECONDS))
stream.window(TumblingTimeWindows.of((Time.of(, TimeUnit.SECONDS)))   .trigger(ProcessingTimeTrigger.create())
Sliding processing time window

stream.timeWindow(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS))
stream.window(SlidingTimeWindows.of(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS)))   .trigger(ProcessingTimeTrigger.create())

Windows on Unkeyed Data Streams

You也可以对普通流(stream,之前都是keyedStream)定义窗口。通过调用 the windowAll 这个transformation.这个stream 包含了所有keyed的stream, but 在一个单独的task里evaluated  (在一个单独的计算节点上). 定义trigger和evictor的语法是一样的:

nonKeyedStream     .windowAll(SlidingTimeWindows.of(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS))     .trigger(Count.of())     .evictor(Count.of());
nonKeyedStream     .windowAll(SlidingTimeWindows.of(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS))     .trigger(Count.of())     .evictor(Count.of())

基本的window 定义也适用于普通的nokey的windows:

Tumbling time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.of(, TimeUnit.SECONDS));
Sliding time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS));
Tumbling count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.

nonKeyedStream.countWindowAll()
Sliding count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).

nonKeyedStream.countWindowAll(, )
Tumbling time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.of(, TimeUnit.SECONDS));
Sliding time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment.

nonKeyedStream.timeWindowAll(Time.of(, TimeUnit.SECONDS), Time.of(, TimeUnit.SECONDS));
Tumbling count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.

nonKeyedStream.countWindowAll()
Sliding count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).

nonKeyedStream.countWindowAll(, )

Back to top

Execution Parameters

容错

Flink有检查点机制来恢复出现问题的job,此机制需要对信息进行持久化来支持之后需要的时候进行再次访问。

检查点机制会保存数据源的进度和用户自定义的状态来提供一次性处理语义

可以在StreamExecutionEnvironment上调用enableCheckpointing(n)来启用检查点机制,n代表检查点时间片的秒数

检查点中的的其它参数:

  • 重试的次数:setNumberOfExecutionRerties() 这个方法返回这个job在失败后已经重启的次数。检查点启用打但是这个值没有显示设置的情况下,job会无限次地重新启动。
  • 一次处理语义或至少一次语义:可以通过enableCheckpointing(n)来任选一种模式。一次处理语义对大多数应用来说是实用的,至少处理一次语义对某些低延时的(持续几毫秒)的应用来说会更好。

docs on streaming fault tolerance 详细介绍了Flink的容错机制

当源添加了快照机制后Flink可以确保将一次处理语义更新为用户自定义的状态。现在只支持源头是kafka(和一些内置的数据生成器)。The following table lists the state update guarantees of Flink coupled with the bundled sources:

Apache Kafkaexactly onceUse the appropriate Kafka connector for your version
RabbitMQat most once 
Twitter Streaming APIat most once 
Collectionsat most once 
Filesat least onceAt failure the file will be read from the beginning
Socketsat most once 

To guarantee end-to-end exactly-once record delivery (in addition to exactly-once updates), the data sink needs to take part in the snapshotting mechanism. The following table lists the delivery guarantees (assuming exactly-once state updates) of Flink coupled with bundled sinks:

HDFS rolling sinkexactly onceImplementation depends on Hadoop version
Elasticsearchat least once 
Kafka producerat least once 
File sinksat least once 
Socket sinksat lest once 
Standard outputat least once 

并行度

可以在创建operator的时候调用setParallelism(int)设置并行的实例个数

延时控制

默认情况下,元素在网络传输中并不是一个接一个的(这样容易导致网络拥塞),而是采用缓冲的方式。缓冲区的大小(在机器间实际传输的大小)可以在配置文件中设置。虽然这种方式对于提高吞吐量很有帮助,但是对于incoming速度很慢的流来说容易导致延时。可在执行环境中(或个人的operators)使用env.setBufferTimeOut(timeoutMillis)来设置缓冲区的最长填充时间。在这个时间到之后就会发送缓冲区类的数据,即使缓冲区还没有被填满。默认的时间为100ms

用法:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setBufferTimeout(timeoutMillis);  env.genereateSequence(,).map(new MyMapper()).setBufferTimeout(timeoutMillis);
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment env.setBufferTimeout(timeoutMillis)  env.genereateSequence(,).map(myMap).setBufferTimeout(timeoutMillis)

为最大化吞吐量,调用setBufferTimeOut(-1)来禁用时间限制,这种情况下缓冲区只有被填充满了才会发送。最小化延时,就可以将时间限制设置接近0(比如5或10ms)。但是要避免将时间设置设为0,它会引起严重的性能下降

Back to top

状态处理

Flink中的所有transformation和functions看起来很像(在function 处理的术语中),但是实际上它是带状态的处理操作。你可以定义本地变量或者使用Flink的状态接口来将每一个transformation(map,filter等)状态化。你可以注册一个实现了状态接口的本地变量来当做managed state。在这个以及其它使用Flink本地状态接口的例子中,Flink会在出错的时候自动地取得transformations的状态并存储起来。

最终需要达到的效果是在出现故障或未出现故障的情况下都能对各种状态做正确的更新。

首先,我们来看在出现故障点额情况下如何保持本地变量的一致,然后再来看Flink的state接口。

默认情况下,state的检查点会保存在内存中的JobManager中,对于大量state的持久化,Flink支持将检查点存储到文件系统(如HDFS、S3等等),这些都能在flink-conf.yaml或者viaStreamExecutionEnvironment.setStateBackend(…)中配置。

Checkpointing 本地变量

本地变量能通过Checkpointed接口来被checkpointed

用户自定义的function如果实现了Checkpointed接口,那么snapshotState(...) 和restoreState(...)这些方法就会被执行,将function state存储下来.

除此之外,自定义的functions 也能实现CheckpointNotifier接口,通过thenotifyCheckpointComplete(long checkpointId)来获得完成checkpoint的通知。注意,如果在checkPoint完成后和通知发出前出现了故障,那么Flink不能保证你自定义的function会接收到checkPoint完成通知。这些丢失的通知都应该被包含在下一个checkPoint的通知中。

比如,以下是使用Checkpointed接口的范例:

public class CounterSum implements ReduceFunction<Long>, Checkpointed<Long> {      // persistent counterprivatelong counter =0;      @Override     public Long reduce(Long value1, Long value2){         counter++;return value1 + value2;}// regularly persists state during normal operation     @Override     public Serializable snapshotState(long checkpointId,long checkpointTimestamp){return counter;}// restores state on recovery from failure     @Override     publicvoidrestoreState(Long state){         counter = state;}}

使用Key/Value的state接口

state接口提供了到key/value states的权限。因为state是根据key来划分,所以它只能使用在通过stream.keyBy(...)之后的KeyedStream上。

The handle to the state can be obtained from the function’s RuntimeContext. The state handle will then give access to the value mapped under the key of the current record or window - each key consequently has its own value.

The following code sample shows how to use the key/value state inside a reduce function. When creating the state handle, one needs to supply a name for that state (a function can have multiple states of different types), the type of the state (used to create efficient serializers), and the default value (returned as a value for keys that do not yet have a value associated).

public class CounterSum implements RichReduceFunction<Long> {      /** The state handle */     private OperatorState<Long> counter;      @Override     public Long reduce(Long value1, Long value2) {         counter.update(counter.value() + );         return value1 + value2;     }      @Override     public void open(Configuration config) {         counter = getRuntimeContext().getKeyValueState("myCounter", Long.class, 0L);     } }

State updated by this is usually kept locally inside the flink process (unless one configures explicitly an external state backend). This means that lookups and updates are process local and this very fast.

The important implication of having the keys set implicitly is that it forces programs to group the stream by key (via the keyBy() function), making the key partitioning transparent to Flink. That allows the system to efficiently restore and redistribute keys and state.

The Scala API has shortcuts that for stateful map() or flatMap() functions on KeyedStream, which give the state of the current key as an option directly into the function, and return the result with a state update:

val stream: DataStream[(String, Int)] = ...  val counts: DataStream[(String, Int)] = stream   .keyBy(_._1)   .mapWithState((in: (String, Int), count: Option[Int]) =>     count match {       case Some(c) => ( (in._1, c), Some(c + in._2) )       case None => ( (in._1, ), Some(in._2) )     })

Stateful Source Functions

Stateful sources require a bit more care as opposed to other operators. In order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source’s context.

 public static class CounterSource implements RichParallelSourceFunction<Long>, Checkpointed<Long> {      /**  current offset for exactly once semantics */     private long offset;      /** flag for job cancellation */     private volatile boolean isRunning = true;          @Override     public void run(SourceContext<Long> ctx) {         final Object lock = ctx.getCheckpointLock();                  while (isRunning) {             // output and state update are atomic             synchronized (lock) {                 ctx.collect(offset);                 offset += 1;             }         }     }      @Override     public void cancel() {         isRunning = false;     }      @Override     public Long snapshotState(long checkpointId, long checkpointTimestamp) {         return offset;      }      @Override     public void restoreState(Long state) {         offset = state;     } }

Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the flink.streaming.api.checkpoint.CheckpointNotifier interface.

State Checkpoints in Iterative Jobs

Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing:env.enableCheckpointing(interval, force = true).

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

Back to top

Iterations

Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example using filters. First, we define an IterativeStream

IterativeStream<Integer> iteration = input.iterate();

Then, we specify the logic that will be executed inside the loop using a series of trasformations (here a simple map transformation)

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

To close an iteration and define the iteration tail, call the closeWith(feedbackStream) method of the IterativeStream. The DataStream given to the closeWith function will be fed back to the iteration head. A common pattern is to use a filter to separate the part of the strem that is fed back, and the part of the stream which is propagated forward. These filters can, e.g., define the “termination” logic, where an element is allowed to propagate downstream rather than being fed back.

iteration.closeWith(iterationBody.filter(/* one part of the stream */)); DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

DataStream<Long> someIntegers = env.generateSequence(, ); 		 IterativeStream<Long> iteration = someIntegers.iterate();  DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {   @Override   public Long map(Long value) throws Exception {     return value -  ;   } });  DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {   @Override   public boolean filter(Long value) throws Exception {     return (value > );   } });  iteration.closeWith(stillGreaterThanZero);  DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {   @Override   public boolean filter(Long value) throws Exception {     return (value <= );   } });

Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example iteration where the body (the part of the computation that is repeated) is a simple map transformation, and the elements that are fed back are distinguished by the elements that are forwarded downstream using filters.

val iteratedStream = someDataStream.iterate(   iteration => {     val iterationBody = iteration.map(/* this is executed many times */)     (tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */)) })

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

val someIntegers: DataStream[Long] = env.generateSequence(, )  val iteratedStream = someIntegers.iterate(   iteration => {     val minusOne = iteration.map( v => v - )     val stillGreaterThanZero = minusOne.filter (_ > )     val lessThanZero = minusOne.filter(_ <= )     (stillGreaterThanZero, lessThanZero)   } )

Back to top

Connectors

Connectors provide code for interfacing with various third-party systems.

Currently these systems are supported:

To run an application using one of these connectors, additional third party components are usually required to be installed and launched, e.g. the servers for the message queues. Further instructions for these can be found in the corresponding subsections. Docker containers are also provided encapsulating these services to aid users getting started with connectors.

Apache Kafka

This connector provides access to event streams served by Apache Kafka.

Flink provides special Kafka Connectors for reading and writing data to Kafka topics. The Flink Kafka Consumer integrates with Flink’s checkpointing mechanisms to provide different processing guarantees (most importantly exactly-once guarantees).

For exactly-once processing Flink can not rely on the auto-commit capabilities of the Kafka consumers. The Kafka consumer might commit offsets to Kafka which have not been processed successfully.

Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the flink-connector-kafka-083package and the FlinkKafkaConsumer082 class are appropriate.

flink-connector-kafka0.9, 0.10KafkaSource0.8.1, 0.8.2Does not participate in checkpointing (no consistency guarantees)Uses the old, high level KafkaConsumer API, autocommits to ZK via Kafka
flink-connector-kafka0.9, 0.10PersistentKafkaSource0.8.1, 0.8.2Does not guarantee exactly-once processing, element order, or strict partition assignmentUses the old, high level KafkaConsumer API, offsets are committed into ZK manually
flink-connector-kafka-0830.9.1, 0.10FlinkKafkaConsumer0810.8.1Guarantees exactly-once processingUses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK manually
flink-connector-kafka-0830.9.1, 0.10FlinkKafkaConsumer0820.8.2Guarantee exactly-once processingUses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK manually

Then, import the connector in your maven project:

<dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-connector-kafka</artifactId>   <version>1.0-SNAPSHOT</version> </dependency>

Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution here.

Installing Apache Kafka

  • Follow the instructions from Kafka’s quickstart to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
  • On 32 bit computers this problem may occur.
  • If the Kafka and Zookeeper servers are running on a remote machine, then the advertised.host.name setting in theconfig/server.properties file must be set to the machine’s IP address.

Kafka Consumer

The standard FlinkKafkaConsumer082 is a Kafka consumer providing access to one topic.

The following parameters have to be provided for the FlinkKafkaConsumer082(...) constructor:

  1. The topic name
  2. A DeserializationSchema
  3. Properties for the Kafka consumer. The following properties are required:
    • “bootstrap.servers” (comma separated list of Kafka brokers)
    • “zookeeper.connect” (comma separated list of Zookeeper servers)
    • “group.id” the id of the consumer group

Example:

Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream<String> stream = env 	.addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(), properties)) 	.print();
val properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); stream = env     .addSource(new KafkaSource[String]("topic", new SimpleStringSchema(), properties))     .print

Kafka Consumers and Fault Tolerance

As Kafka persists all the data, a fault tolerant Kafka consumer can be provided.

The FlinkKafkaConsumer082 can read a topic, and if the job fails for some reason, the source will continue on reading from where it left off after a restart. For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.

To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing();

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.

Kafka Sink

A class providing an interface for sending data to Kafka.

The following arguments have to be provided for the KafkaSink(…) constructor in order:

  1. Broker address (in hostname:port format, can be a comma separated list)
  2. The topic name
  3. Serialization schema

Example:

stream.addSink(new KafkaSink<String>("localhost:9092", "test", new SimpleStringSchema()));
stream.addSink(new KafkaSink[String]("localhost:9092", "test", new SimpleStringSchema))

The user can also define custom Kafka producer configuration for the KafkaSink with the constructor:

public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,       SerializationSchema<IN, byte[]> serializationSchema)
public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,       SerializationSchema serializationSchema)

If this constructor is used, the user needs to make sure to set the broker(s) with the “metadata.broker.list” property. Also the serializer configuration should be left default, and the serialization should be set via SerializationSchema.

The Apache Kafka official documentation can be found here.

Back to top

Elasticsearch

This connector provides a Sink that can write to an Elasticsearch Index. To use this connector, add the following dependency to your project:

<dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-connector-elasticsearch</artifactId>   <version>1.0-SNAPSHOT</version> </dependency>

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.

Installing Elasticsearch

Instructions for setting up an Elasticsearch cluster can be found here. Make sure to set and remember a cluster name. This must be set when creating a Sink for writing to your cluster

Elasticsearch Sink

The connector provides a Sink that can send data to an Elasticsearch Index.

The sink can use two different methods for communicating with Elasticsearch:

  1. An embedded Node
  2. The TransportClient

See here for information about the differences between the two modes.

This code shows how to create a sink that uses an embedded Node for communication:

DataStream<String> input = ...;  Map<String, String> config = Maps.newHashMap(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "my-cluster-name");  input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {     @Override     public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {         Map<String, Object> json = new HashMap<>();         json.put("data", element);          return Requests.indexRequest()                 .index("my-index")                 .type("my-type")                 .source(json);     } }));
val input: DataStream[String] = ...  val config = new util.HashMap[String, String] config.put("bulk.flush.max.actions", "1") config.put("cluster.name", "my-cluster-name")  text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] {   override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {     val json = new util.HashMap[String, AnyRef]     json.put("data", element)     println("SENDING: " + element)     Requests.indexRequest.index("my-index").`type`("my-type").source(json)   } }))

Note how a Map of Strings is used to configure the Sink. The configuration keys are documented in the Elasticsearch documentation here. Especially important is the cluster.name parameter that must correspond to the name of your cluster.

Internally, the sink uses a BulkProcessor to send index requests to the cluster. This will buffer elements before sending a request to the cluster. The behaviour of the BulkProcessor can be configured using these config keys: * bulk.flush.max.actions: Maximum amount of elements to buffer * bulk.flush.max.size.mb: Maximum amount of data (in megabytes) to buffer * bulk.flush.interval.ms: Interval at which to flush data regardless of the other two settings in milliseconds

This example code does the same, but with a TransportClient:

DataStream<String> input = ...;  Map<String, String> config = Maps.newHashMap(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "my-cluster-name");  List<TransportAddress> transports = new ArrayList<String>(); transports.add(new InetSocketTransportAddress("node-1", )); transports.add(new InetSocketTransportAddress("node-2", ));  input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder<String>() {     @Override     public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {         Map<String, Object> json = new HashMap<>();         json.put("data", element);          return Requests.indexRequest()                 .index("my-index")                 .type("my-type")                 .source(json);     } }));
val input: DataStream[String] = ...  val config = new util.HashMap[String, String] config.put("bulk.flush.max.actions", "1") config.put("cluster.name", "my-cluster-name")  val transports = new ArrayList[String] transports.add(new InetSocketTransportAddress("node-1", )) transports.add(new InetSocketTransportAddress("node-2", ))  text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] {   override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {     val json = new util.HashMap[String, AnyRef]     json.put("data", element)     println("SENDING: " + element)     Requests.indexRequest.index("my-index").`type`("my-type").source(json)   } }))

The difference is that we now need to provide a list of Elasticsearch Nodes to which the sink should connect using a TransportClient.

More about information about Elasticsearch can be found here.

Back to top

Hadoop FileSystem

This connector provides a Sink that writes rolling files to any filesystem supported by Hadoop FileSystem. To use this connector, add the following dependency to your project:

<dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-connector-filesystem</artifactId>   <version>1.0-SNAPSHOT</version> </dependency>

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.

Rolling File Sink

The rolling behaviour as well as the writing can be configured but we will get to that later. This is how you can create a default rolling sink:

DataStream<String> input = ...;  input.addSink(new RollingSink<String>("/base/path"));
val input: DataStream[String] = ...  input.addSink(new RollingSink("/base/path"))

The only required parameter is the base path where the rolling files (buckets) will be stored. The sink can be configured by specifying a custom bucketer, writer and batch size.

By default the rolling sink will use the pattern "yyyy-MM-dd--HH" to name the rolling buckets. This pattern is passed to SimpleDateFormat with the current system time to form a bucket path. A new bucket will be created whenever the bucket path changes. For example, if you have a pattern that contains minutes as the finest granularity you will get a new bucket every minute. Each bucket is itself a directory that contains several part files: Each parallel instance of the sink will create its own part file and when part files get too big the sink will also create a new part file next to the others. To specify a custom bucketer use setBucketer() on a RollingSink.

The default writer is StringWriter. This will call toString() on the incoming elements and write them to part files, separated by newline. To specify a custom writer use setWriter() on a RollingSink. If you want to write Hadoop SequenceFiles you can use the providedSequenceFileWriter which can also be configured to use compression.

The last configuration option is the batch size. This specifies when a part file should be closed and a new one started. (The default part file size is 384 MB).

Example:

DataStream<Tuple2<IntWritable,Text>> input = ...;  RollingSink sink = new RollingSink<String>("/base/path"); sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")); sink.setWriter(new SequenceFileWriter<IntWritable, Text>()); sink.setBatchSize( *  * ); // this is 400 MB,  input.addSink(sink);
val input: DataStream[Tuple2[IntWritable, Text]] = ...  val sink = new RollingSink[String]("/base/path") sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")) sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) sink.setBatchSize( *  * ) // this is 400 MB,  input.addSink(sink)

This will create a sink that writes to bucket files that follow this schema:

/base/path/{date-time}/part-{parallel-task}-{count}

Where date-time is the string that we get from the date/time format, parallel-task is the index of the parallel sink instance and count is the running number of part files that where created because of the batch size.

For in-depth information, please refer to the JavaDoc for RollingSink.

Back to top

RabbitMQ

This connector provides access to data streams from RabbitMQ. To use this connector, add the following dependency to your project:

<dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-connector-rabbitmq</artifactId>   <version>1.0-SNAPSHOT</version> </dependency>

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.

Installing RabbitMQ

Follow the instructions from the RabbitMQ download page. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.

RabbitMQ Source

A class providing an interface for receiving data from RabbitMQ.

The followings have to be provided for the RMQSource(…) constructor in order:

  1. The hostname
  2. The queue name
  3. Deserialization schema

Example:

DataStream<String> stream = env 	.addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema())) 	.print
stream = env     .addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema))     .print

RabbitMQ Sink

A class providing an interface for sending data to RabbitMQ.

The followings have to be provided for the RMQSink(…) constructor in order:

  1. The hostname
  2. The queue name
  3. Serialization schema

Example:

stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
stream.addSink(new RMQSink[String]("localhost", "hello", new StringToByteSerializer))

More about RabbitMQ can be found here.

Back to top

Twitter Streaming API

Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-inTwitterSource class for establishing a connection to this stream. To use this connector, add the following dependency to your project:

<dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-connector-twitter</artifactId>   <version>1.0-SNAPSHOT</version> </dependency>

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.

Authentication

In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.

Acquiring the authentication information

First of all, a Twitter account is needed. Sign up for free at twitter.com/signup or sign in at Twitter’s Application Management and register the application by clicking on the “Create New App” button. Fill out a form about your program and accept the Terms and Conditions. After selecting the application, the API key and API secret (called consumerKey and sonsumerSecret in TwitterSource respectively) is located on the “API Keys” tab. The necessary access token data (token and secret) can be acquired here. Remember to keep these pieces of information secret and do not push them to public repositories.

Accessing the authentication information

Create a properties file, and pass its path in the constructor of TwitterSource. The content of the file should be similar to this:

#properties file for my app secret=*** consumerSecret=*** token=***-*** consumerKey=***

Constructors

The TwitterSource class has two constructors.

  1. public TwitterSource(String authPath, int numberOfTweets); to emit finite number of tweets
  2. public TwitterSource(String authPath); for streaming

Both constructors expect a String authPath argument determining the location of the properties file containing the authentication information. In the first case, numberOfTweets determines how many tweet the source emits.

Usage

In contrast to other connectors, the TwitterSource depends on no additional services. For example the following code should run gracefully:

DataStream<String> streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"));
streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"))

The TwitterSource emits strings containing a JSON code. To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example, there is an implementation JSONParseFlatMap abstract class among the examples. JSONParseFlatMap is an extension of the FlatMapFunction and has a

String getField(String jsonText, String field);
getField(jsonText : String, field : String) : String

function which can be use to acquire the value of a given field.

There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information.

Example

TwitterLocal is an example how to use TwitterSource. It implements a language frequency counter program.

Back to top

Docker containers for connectors

A Docker container is provided with all the required configurations for test running the connectors of Apache Flink. The servers for the message queues will be running on the docker container while the example topology can be run on the user’s computer.

Installing Docker

The official Docker installation guide can be found here. After installing Docker an image can be pulled for each connector. Containers can be started from these images where all the required configurations are set.

Creating a jar with all the dependencies

For the easiest setup, create a jar with all the dependencies of the flink-streaming-connectors project.

cd /PATH/TO/GIT/flink/flink-staging/flink-streaming-connectors mvn assembly:assembly ~~~bash  This creates an assembly jar under *flink-streaming-connectors/target*.  #### RabbitMQ Pull the docker image:  ~~~bash sudo docker pull flinkstreaming/flink-connectors-rabbitmq

To run the container, type:

sudo docker run -p 127.0.0.1:5672:5672 -t -i flinkstreaming/flink-connectors-rabbitmq

Now a terminal has started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost’s and the Docker container’s ports so RabbitMQ can communicate with the application through these.

To start the RabbitMQ server:

sudo /etc/init.d/rabbitmq-server start

To launch the example on the host computer, execute:

java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology \ > log.txt 2> errorlog.txt

There are two connectors in the example. One that sends messages to RabbitMQ, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:

<DATE> INFO rabbitmq.RMQTopology: String: <one> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <two> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <three> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <four> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <five> arrived from RMQ 

Apache Kafka

Pull the image:

sudo docker pull flinkstreaming/flink-connectors-kafka

To run the container type:

sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i \ flinkstreaming/flink-connectors-kafka

Now a terminal has started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost’s and the Docker container’s ports so Kafka can communicate with the application through these. First start a zookeeper in the background:

/kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties \ > zookeeperlog.txt &

Then start the kafka server in the background:

/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties \  > serverlog.txt 2> servererr.txt &

To launch the example on the host computer execute:

java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology \ > log.txt 2> errorlog.txt

In the example there are two connectors. One that sends messages to Kafka, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:

<DATE> INFO kafka.KafkaTopology: String: (0) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (1) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (2) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (3) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (4) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (5) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (6) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (7) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (8) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (9) arrived from Kafka 

Back to top

Program Packaging & Distributed Execution

See the relevant section of the DataSet API documentation.

Back to top

Parallel Execution

See the relevant section of the DataSet API documentation.

Back to top

Execution Plans

See the relevant section of the DataSet API documentation.

Back to top

 
04-15 01:12