Overview
- A Quick Example
- Basic Concepts
- Linking
- Initializing StreamingContext
- Discretized Streams (DStreams)
- Input DStreams and Receivers
- Transformations on DStreams
- Output Operations on DStreams
- DataFrame and SQL Operations
- MLlib Operations
- Caching / Persistence
- Checkpointing
- Accumulators, Broadcast Variables, and Checkpoints
- Deploying Applications
- Monitoring Applications
- Performance Tuning
- Reducing the Batch Processing Times
- Setting the Right Batch Interval
- Memory Tuning
- Fault-tolerance Semantics
- Where to Go from Here
1. Overview 概述
Spark Streaming是核心Spark API的扩展,支持可扩展,高吞吐量,实时数据流的容错数据流处理。可以从sources(如Kafka、Flume、Kinesis、或者TCP sockets)获取数据,并通过复杂的算法处理数据,这些算法使用高级函数(如map,reduce,join和window)表示。最后,处理过的数据可以推送到文件系统、数据库和实时仪表板。事实上,你可以将Spark的机器学习和图形处理算法应用于数据流。
在内部,它的工作原理如下。Spark Streaming接受实时输入数据流,并把数据分成批,然后由Spark引擎处理,以批量生成最终结果流。
Spark Streaming提供了一个高层次的抽象,称为离散流或DStream,它代表连续的数据流。DStreams可以通过Kafka,Flume和Kinesis等来源的输入数据流创建,也可以通过在其他DStream上应用高级操作来创建。在内部,一个DStream被表示为一系列RDD。
本指南将向你介绍如何开始使用DStreams编写Spark Streaming程序。你可以使用Scala,Java或Python编写Spark Streaming程序,所有这些都在本指南中介绍。你将可以在本指南中通过标签,选择不同语言的代码片段。
注意: 在Python中有少数不同或者不可用的APIs。通过本指南,你将会找到突出显示这些差异的Python API标签。
2. A Quick Example 快速学习示例
在我们进入如何编写属于你自己的Spark Streaming程序的细节之前,让我们快速浏览一个简单Spark Streaming程序是怎样的。假设我们想要统计从监听TCP socket的数据服务器接收到的文本数据中的字数。全部你需要做的是:
第一,我们将Spark Streaming类名和StreamingContext的一些隐式转换导入到我们的环境中,以便将有用的方法添加到我们需要的其他类(如DStream)中。StreamingContext是所有流功能的主要入口点。我们创建一个带有两个执行线程的本地StreamingContext,批处理间隔为1秒。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
通过使用该context,我们可以创建一个DStream来表示来自TCP source的流数据,指定为主机名(例如localhost)和端口(例如9999)。
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
该lines DStream表示从数据服务器接收到的流数据。在该DStream中的每条记录是一行文本。接下来,我们想要通过空白字节把行分割成单词。
// Split each line into words
val words = lines.flatMap(_.split(" "))
flatMap是一个一对多one-to-many的DStream操作,通过在源DStream把每条记录生成多个记录来创建一个新的DStream。在这种情况下,每行将会被分割成多个单词,单词流被表示为words DStream。接下来,我们想要统计这些单词。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
words DStream进一步映射(一对一变换)为一个(字,1)对的DStream,然后通过reduceByKey获得每批数据中字的频率。最后,wordCounts.print()将会打印每秒生成的一些计数。
注意当执行这些lines时,Spark Streaming只会设置它在启动时执行的计算,并且尚未开始实际处理。在所有转换完成之后开始处理,我们最终调用
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
完整的代码可以在Spark Streaming示例NetworkWordCount中找到。
如果你已经准备下载和建立Spark,你可以运行这个例子。你首先需要通过使用nc -lk 9999运行Netcat(在大多数Unix系统中都有一个小实用程序)作为一个数据源服务器,然后在一个不同的终端中,你可以通过使用命令启动该例子:
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
然后,在运行netcat服务终端上输入的任何行将会被计数并每秒在屏幕上打印。它看起来像下面那样:
3. Basic Concepts 基础概念
接下来,我们超越这个简单的例子,详细简述Spark Streaming的基础知识。
3.1 Linking 链接
与Spark类似,Spark Streaming可以通过Maven Central获得。编写你自己的Spark Streaming程序,你必须添加下述依赖到你的SBT或者Maven项目。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.1</version>
</dependency>
为了从sources(如Kafka,Flume和Kinesis,这些sources不在Spark Streaming核心API中)获取数据,你必须将相应的artifact(工件)spark-streaming-xyz_2.11添加到依赖关系中。例如,一些常用的commons如下:
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-10_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
有关最新的列表,请参阅Maven存储库以获取支持的sources和artifacts(工件)的完整列表。
3.2 Initializing StreamingContext 初始化StreamingContext
为了初始化Spark Streaming程序,必须创建一个StreamingContext对象,这是所有Spark Streaming功能的主要入口点。
一个StreamingContext对象可以从一个SparkConf对象创建:
import org.apache.spark._
import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName参数是你应用程序在集群UI上展示的名字。master是Spark,Mesos或者YARN集群URL,或者是以本地模式运行的特定“local[*]”字符串。实际上, 当在一个集群中运行时,你不希望在程序中hardcode(硬编码)master,而是使用spark-submit启动应用程序,并在那里接收它。然而,对于本地测试和单元测试,你可以通过"local[*]"运行进程内的Spark Streaming(检测本地系统中的核心数量)。注意这内部创建了一个SparkContext(Spark全部功能的起点),它可以作为ssc.sparkContext被访问。
批处理间隔必须根据你的应用程序和可用集群资源的延迟要求来设置。详情请看Performance Tuning节点。
一个StreamingContext对象也可以从一个现有的SparkContext对象中创建。
import org.apache.spark.streaming._ val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
当一个context(上下文)被定义后,你必须执行以下操作:
- 通过创建输入DStreams定义输入源。
- 通过将转换(transformation)和输出操作应用于DStreams来定义流计算。
- 开始接收数据并使用streamingContext.start()处理它。
- 使用streamingContext.awaitTermination()去等待处理停止(手动或由于任何错误)。
- 可以使用streamingContext.stop()去手动停止处理。
要点纪要:
- 一旦context(上下文)已经开始后,就不能建立或者添加新的流式计算。
- 一旦context(上下文)已经停止后,它就不能被重新启动。
- 在同一时间一个JVM虚拟机里只能有一个StreamingContext可以处于活动状态。
- StreamingContext上的stop()也可以停止SparkContext。为了仅停止StreamingContext,设置stop()可选参数stopSparkContext为false。
- 一个sparkContext可以被重新使用去创建多个StreamingContexts,只要先前的StreamingContext在下一个StreamingContext被创建之前停止(不停止SparkContext)。
3.3 Discretized Streams(DStreams) 离散流
Discretized Streams或者DStream是Spark Streaming提供的基本抽象。它表示一个可持续的数据流,或者是从source接收的输入数据流,或者是通过转换输入流生成的处理过的数据流。在内部,DStream由连续的RDD系列表示,它是Spark对不可变的分布式dataset(数据集)的抽象(详情请看Spark Programming Guide)。DStream中的每个RDD都包含一定间隔的数据,如下图所示。
在DStream上应用的任何操作都会转换为对基础RDD的操作。例如,在将行转换为单词的早期例子中,在行DStream中的每个RDD上应用flatMap操作以生成单词DStream的RDD。如下图所示。
这些基础的RDD转换是通过Spark引擎计算的。DStream操作隐藏大部分细节,并为开发者提供了一个便利的更高级API。这些操作可在下个节点中讨论。
3.4 Input DStreams and Receivers 输入DStreams和接收者
Input DStream是表示从流来源接收的输入数据流的DStreams。在快速学习实例中,lines是一个输入DStreams,因为它表示从netcat服务器接收的数据流。每个输入DStream(除了文件流,在本节稍后讨论)与一个接收者对象相关联的,它从一个source源接收数据并将其存储在Spark的内存中进行处理。
Spark Streaming提供了两类内置的流sources(源)。
- 基本sources:在StreamingContext API中Sources直接可用。例如:文件系统和socket(套接字)连接。
- 高级sources:源(如Kafka,Flume,Kinesis等等)通过额外的使用类是可用的。这些要求添加额外的依赖包,这在linking节点中有过讨论。
我们将要在本节点后面讨论每个类别出现的一些来源sources。
注意,如果你想要在你streaming应用程序中并行接收多个数据源,你可以创建多个输入DStreams(在Performance Tuning节点进一步讨论)。这将会创建多个接收器,它将同时接收多个数据流。但是注意,Spark worker/executor是一个长期运行的任务,它占用了分配给Spark Streaming应用程序的内核其中之一。因此,重要的是要记住,Spark Streaming应用程序需要分配足够多的内核(或者线程,如果本地运行时)去处理接收到的数据,以及运行接收器。
记住要点:
- 当本地运行Spark Streaming项目时,不要使用“local”或者“local[1]”作为master URL。这些中的任何一个意味着本地只有一个线程用于运行任务。如果你使用基于接收器(如sockets、Kafka、Flume等)的输入DStream,然后单线程将会用于该接收器,而没有线程用于处理接收到的数据。因此,当本地运行时,总是使用"local[n]"作为master URL,这里n>运行的接收器的数量(详情查看Spark Properties关于如何设置master的信息)。
- 将逻辑扩展在集群上运行,分配给Spark Streaming应用程序的内核数量必须大于接收器的数量。否则系统只会接收到数据,但无法处理。
3.4.1 Basic Sources 基本来源
在从一个TCP socket(套接字)连接接收到的文本数据创建DStream的快速学习实例中,我们已经了解过ssc.socketTextStream(...)。除了sockets(套接字),StreamingContext API还提供了从文件去创建DStreams作为输入sources的方法。
File Steam文件流:为了从与HDFS API(即HDFS,S3,NFS等)兼容的任何文件系统上的文件读取数据,可以创建DStream为:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming将会监控dataDirectory目录并处理该目录下创建的任何文件(不支持嵌套目录编写的文件)。注意:
- 文件必须有相同的数据格式。
- 文件必须通过在数据目录中原子移动或重命名在dataDirectory中创建。
- 一旦移动,文件必须不能更变。因此如果文件内容被连续追加,新的数据将不会被读取。
对于简单的文本文件,这里有一个更简单的方法streamingContext.textFileStream(dataDirectory)。而文件流不要求运行接收器,因此不要求分配内核。
Python API:fileStream在Python API中不可用,只有textFileStream可用。
Streams based on Custom Receivers基于自定义接收器的流:DStreams可以通过自定义接收器接收到的数据流创建。详情查看Custom Receiver Guide。
Queue of RDDs as a Stream RDD队列作为流:对于使用测试数据测试Spark Streaming应用程序,还可以使用streamingContext.queueStream(queueOfRDDs)基于RDD队列创建DStream。推入队列的每个RDD都将被视为DStream中的一批数据,并像流stream一样进行处理。
更多关于从sockets和filesd的流的详细信息,请参阅Scala的StreamingContext、Java的JavaStreamingContext和Python的StreamingContext中有关功能的API文档。
3.4.2 Advanced Sources 高级Sources
Python API:从Spark2.2.1开始,在这些源中,Kafka、Kinesis和Flume都可以在Python API中使用。
这类资源sources要求与外部的非Spark类库(其中一些具有复杂的依赖关系)进行交互(例如,Kafka和Flume)。因此,为了尽量减少与依赖包版本冲突有关的问题,从这些源sources创建DStreams的功能已经被移动到可以在必要时显式链接的独立库。
注意这些高级源sources在Spark shell是不可用的,因此基于这些高级源sources的应用程序不能在shell中测试。如果你真的想要在Spark shell使用它们,你必须下载相应的Maven工件artifact的JAR以及它的依赖关系包,并将它添加到类路径中。
这些高级源sources中的一些如下:
- Kafka:Spark Streaming 2.2.1与Kafka broker版本0.8.2.1或更高版本兼容。详情请看Kafka Integration Guide。
- Flume:Spark Streaming2.2.1与Flume1.6.0兼容。详情请看Flume Integration Guide。
- Kinesis:Spark Streaming2.2.1与Kinesis Client Library1..2.1兼容。详情请看Kinesis Integration Guide。
3.4.3 Custom Source自定义source
Python API 这个在Python还不能支持。
输入DStreams也可以创建自定义数据源。你需要去做的是实现用户自定义的接收器(看下一节点了解它是什么),它可以接收来自自定义sources的数据并把它们推送到Spark里面。详情请看Custom Receiver Guide。
3.4.4 Receiver Reliability 接收器可靠性
基于其可靠性可以有两种数据源sources。源Sources(如Kafka和Flume)允许传输的数据被确认。如果从这些可靠的来源sources接收数据的系统正确地确认接收到的数据,则可以确保没有数据由于任何故障而丢失。这导致两种接收器:
- 可靠的接收器 —— 当数据已经被接收并存储在具有复制的Spark中时,可靠的接收器正确地向可靠的来源source发送确认。
- 不可靠的接收器 —— 不可靠的接收器不会向来源source发送确认信息。这可以用于不支持确认的来源sources,甚至可以用于不希望或不需要进行复杂性确认的可靠来源。
详情在Custom Receiver Guide查看如何编写可靠的接收器。
3.5 Transformations on DStreams 在DStreams上进行转换
类似于RDD,转换允许来自输入DStream的数据被修改。DStreams支持在正常Spark RDD上的很多转换。详情如下:
Transformation | Meaning |
---|---|
map(func) | 通过函数func传递来源source DStream的每个元素来返回一个新的DStream |
flatMap(func) | 类似于map,但是每个输入item可以映射为0或多个输出items |
filter(func) | 通过仅选择func返回true的那个来源source DStream的记录来返回一个新的DStream |
repartition(numPartitions) | 通过创建或多或少的分区来更改此DStream的并行性级别 |
union(otherStream) | 返回一个新的DStream,其中包含来源source DStream和其他DStream中元素的联合 |
count() | 通过统计来源source DStream的每个RDD的元素数量来返回一个新的单一元素RDD的DStream |
reduce(func) | 通过使用函数func(它带有两个参数并返回一个)来聚合源source DStream的每个RDD中的元素,从而返回一个新的单元素RDD的DStream。函数应该是关联和可交换的,以便可以并行计算。 |
countByValue() | 当在元素类型为K的DStream上调用时,返回一个新的(K,Long)对的DStream,其中每个键的值是它在来源source DStream的每个RDD中的频率。 |
reduceByKey(func, [numTasks]) | 当在元素类型为(K,V)对的DStream上调用时,返回一个新的(K,V)对的DStream,其中每个键的值使用给定的reduce函数进行聚合。注意:默认情况下,它使用Spark的默认并行任务数(2表示本地模式,而在集群模式下,数字由config属性spark.default.parallelism决定)进行分组。你可以传递一个可选的numTasks参数来设置不同数量的任务。 |
join(otherStream, [numTasks]) | 当在两个元素类型分别为(K,V)和(K,W)的DStreams上调用时,为每个键的所有元素对,返回一个新的元素类型为(K,(V,W))对DStream |
cogroup(otherStream, [numTasks]) | 当在一个元素类型为(K,V)和(K,W)对的DStream上调用时,返回一个新的元素类型为(K, Seq[V], Seq[W])元组的DStream |
transform(func) | 通过将RDD-to-RDD函数应用于来源source DStream的每个RDD来返回一个新的DStream。这可以用来在DStream上执行任意RDD操作 |
updateStateByKey(func) | 返回一个新的“状态”DStream,其中通过对键的先前状态和键的新值应用给定的函数来更新每个键的状态。这可以用来维护每个键的任意状态数据 |
这些转换中一些是值得更详细地讨论的。
3.5.1 UpdateStateByKey Operation UpdateStateByKey操作
updateStateByKey操作允许你维护任意状态,同时不断更新新信息。为了使用这个,你将不得不做两个步骤:
- 定义状态 —— 状态可以是任意的数据类型。
- 定义状态更新函数 —— 用函数指定如何使用之前的状态来更新状态以及如何更新输入流中的新值。
在每个批处理中,Spark将状态更新函数(或功能)应用于全部现有的密钥key,而不管批处理中是否有新数据。如果更新函数(或功能)返回None,那么键值对将被消除。
让我们用例子说明。假设你想在文件数据流中保持看到的每个单词的运行数量。这里,运行的数量是状态并且它是一个整型。我们定义更新函数如:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
这是在包含单词的DStream中应用(假设,在早期示例中对DStream包含(word,1)对)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
更新函数将会被每个单词调用,newValues具有1的序列(来自(word,1)对),并且runningCount具有前一个计数。
注意使用updateStateByKey要求配置好checkpoint目录,详情参阅checkpointing节点。
3.5.2 Transform Operation 转换操作
transform转换操作(带有它的变量如transformWith)允许任意RDD-to-RDD函数应用于DStream。它可以用于应用任何未在DStream API中公开的RDD操作。例如,在数据流中的每个批次与其他数据集dataset连接起来的功能不会直接暴露在DStream API中。然而,你可以很容易地使用transform完成此操作。这可能性非常强大。例如,可以通过将输入数据流与预先计算的垃圾信息(也可以使用Spark生成)进行实时数据清理,然后基于此进行过滤。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}
注意,提供的函数在每个批处理间隔中被调用。此函数允许你执行随时间变化的RDD操作,即可以在批次之间更改RDD操作、分区数量、广播变量等等。
3.5.3 Window Operations Window操作
Spark Streaming也提供窗口化计算,这允许你在滑动的数据窗口上应用转换transformations。下图说明该滑动窗口。
就像上图展示的那样,每当窗口滑过源Source DStream时,窗口内的源Source RDD被组合并操作以产生窗口DStream的RDD。在这个特定情况中,该操作应用涵盖在最后3个时间单位的数据上并滑过2个时间单位。这表明任何窗口操作需要指定两个参数。
- 窗口长 —— 窗口的持续时间(图表中的3)
- 滑动间隔 —— 窗口操作的执行间隔(图中的2)
这个两个参数必须是源source DStream(图中的1)的批处理间隔的倍数。
让我们用个例子说明窗口操作。假设你想要通过在最后的30秒数据中每10秒生成一个字数来扩展前面的示例。为了做到这一点,我们必须在最后的30秒数据内在(word,1)对的DStream对上应用reduceByKey操作。这是使用reduceByKeyAndWindow操作完成的。
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些普通窗口操作如下。这些操作全部都有上述说的两个参数——windowLength和slideInterval。
Transformation | Meaning |
---|---|
window(windowLength, slideInterval) | 基于在源source DStream上的窗口化批处理计算来返回一个新的DStream。 |
countByWindow(windowLength, slideInterval) | 返回流中元素的滑动窗口数量。 |
reduceByWindow(func, windowLength, slideInterval) | 返回一个新的单元素流,它是通过使用func经过滑动间隔聚合流中的元素来创建的。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 当在元素类型为(K,V)对的DStream调用时,返回一个新的元素类型为(K,V)对的DStream,其中每个key键的值在滑动窗口中使用给定的reduce函数func来进行批量聚合。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 上述reduceByKeyAndWindow()的一个更高效的版本,其中每个窗口的reduce值是使用前一个窗口的reduce值递增计算的。这是通过减少进入滑动窗口的新数据并“反转减少”离开窗口的旧数据来完成的。示例如当窗口滑动时“增加并减少”key键的数量。然而,它仅适用于“可逆减函数”,即具有相应“反减inverse reduce”函数的函数(作为参数invFunc)。如reduceByKeyAndWindow,reduce任务的数量是通过一个可选参数来设置的。注意使用这个操作checkpointing必须是能启用的。 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) | 当在元素类型为(K,V)对的DStream上调用时,返回一个新的元素类型为(K,Long)对的DStream,其中每个key键的值为它在一个滑动窗口出现的频率。如在reduceByKeyAndWindow中,reduce任务的数量是通过一个可选参数设置的。 |
3.5.4 Join Operations连接操作
最终,值得强调的是,你可以轻松地在Spark Streaming中执行不同类型的连接Joins。
Stream-stream joins 流——流连接
流可以很容易地加入到其他流中。
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
这里,在每个批处理间隔中,stream1生成的RDD将会连接到stream2生成的RDD。你也可以执行leftOuterJoin、rightOuterJoin、fullOuterJoin。此外,在流的窗口上进行连接经常是非常有用的。这是非常容易的。
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
Streams-dataset joins 流——数据集连接
当在前面说明DStream.transform操作时,这已经展示了。这里是将窗口流与数据集dataset连接的另一个实例。
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
事实上,你也可以动态改变你想要连接的数据集dataset。提供转换的函数在每个批处理间隔中进行评估,因此将使用数据集引用指向的当前数据集dataset。
API文档提供了DStream转换的完整列表。对于Scala API,请看DStream和PariDStreamFunctions。对于Java API,请看JavaDStream和JavaPairDStream。对于Python API,请看DStream。
3.6 Output Operations on DStreams 在DStreams上的输出操作
输出操作允许DStream的数据被推送到外部系统(如数据或者文件系统)。由于输出操作事实上允许外部系统消费转换过的数据,因此它们会触发所有DStream转换的实际执行(类似于RDD的操作)。当前,输出操作定义如下:
Output Operation | Meaning |
---|---|
print() | 在运行streaming应用程序的驱动节点上的DStream中打印每批数据的前10个元素。这对开发和调试很有用。 Python API:在Python API中叫做pprint()。 |
saveAsTextFiles(prefix, [suffix]) | 把DStream的内容保存为文本文件。每个批次间隔的文件名是根据前缀prefix和后缀suffix:“prefix-TIME_IN_MS[.suffix]”生成的。 |
saveAsObjectFiles(prefix, [suffix]) | 把此DStream的内容保存为序列化Java对象的序列文件。每个批次间隔的文件名是根据前缀prefix和后缀suffix:“prefix-TIME_IN_MS[.suffix]”生成的。 |
saveAsHadoopFiles(prefix, [suffix]) | 把此DStream的内容保存为Hadoop文件。每个批次间隔的文件名是根据前缀prefix和后缀suffix:“prefix-TIME_IN_MS[.suffix]”生成的。 Python API 这个在Python API中不可用。 |
foreachRDD(func) | 最通用的输出操作,将函数func应用于从流中生成的每个RDD。此功能应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。注意,函数func在流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,强制执行流RDD的计算。 |
3.6.1 Design Patterns for using foreachRDD
dstream.foreachRDD是一个功能强大的原语primitive,它允许将数据发送到外部系统。然而,了解如何正确并高效地使用原语primitive是很重要的。如下可以避免一些普通错误。
通常写数据到外部系统要求创建一个连接对象(例如远程服务器的TCP连接)并使用它发送数据到远程系统。为了达到这个目的,开发者可能会不小心尝试在Spark驱动程序中创建连接对象,然后尝试在Spark worker中使用它来讲记录保存在RDD中。如scala中的示例。
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
这是不正确的,因为它要求连接对象可以被序列化并从driver端发送到worker端。这种连接对象很难跨机器传输。这个错误可能表现为序列化错误(连接对象不可序列化)、初始化错误(连接对象需要在worker端初始化)等。正确的解决方法是在worker端创建连接对象。
然而,这可能导致另一个普遍的错误 —— 为每条记录都创建一个新的连接。例如:
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
典型地做法是创建一个连接对象有时间和资源的开销。因此,为每条记录创建和销毁连接对象可能会产生不必要的高开销,并且会显著地降低系统的整体吞吐量。一个更换的解决方案是使用rdd.foreachPartition —— 创建一个单连接对象并在RDD分区使用该连接发送所有的记录。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
这会缓解许多记录中的连接创建开销。
最终,通过在多个RDD/批次重用连接对象,可以进一步优化这个功能。我们可以维护一个静态的可重用的连接对象池,因为多个批处理的RDD被推送到外部系统,从而进一步降低了开销。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
注意在连接池中的连接应该按需延迟创建,并且如果不使用一段时间则超时。这实现了将数据最有效地发送到外部系统。
其他要求需要记住:
- DStreams通过输出操作延迟执行,比如RDD通过RDD actions延迟执行。具体来说,DStream输出操作中的RDD actions会强制处理接收到的数据。因此,如果你的应用程序没有任何输出操作,或者有输出操作但没有任何RDD action在里面,如dstream.foreachRDD(),则不会执行任何操作。系统将会简单地接收数据并丢弃它。
- 默认的,输出操作时一次一个执行的。而且它们按照在应用程序中定义的顺序执行。
3.7 DataFrame and SQL Operations 数据框和SQL操作
你可以在streaming流数据上很容易地使用DataFrame数据框和SQL操作。你必须通过使用StreamingContext正在使用的SparkContext创建SparkSession。此外,必须这样做才能在驱动程序故障时重新启动。这是通过创建SparkSession的一个延迟的实例化单例实例来完成的。这在下面的实例展示。它通过使用DataFrames和SQL来修改前面的word count例子来生产单词数量。每个RDD都可转换为一个DataFrame,注册为一个临时表,然后使用SQL进行查询。
/** DataFrame operations inside your streaming program */ val words: DStream[String] = ... words.foreachRDD { rdd => // Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._ // Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word") // Create a temporary view
wordsDataFrame.createOrReplaceTempView("words") // Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
完整源代码请看 source code.
你也可以在来自不同线程(即与正在运行的StreamingContext异步)的streaming流数据定义的表上运行SQL查询。只要确保你将StreamingContext设置为记住足够多的streaming流数据,以便查询可以运行。另外,不知道任何异步SQL查询的StreamingContext将会在查询完成之前删除旧streaming流数据。例如,如果你想要查询最后一批,但是查询可能花费5分钟才能运行,请调用streamingContext.remember(Minutes(5))(以Scala或其他语言的等效方式)。
参阅DataFrames and SQL指南去更多地了解DataFrames。
3.8 MLlib Operations MLlib操作
你可以很容易地使用MLlib提供的机器学习算法。首先,streaming流机器学习算法(例如流式线性回归Streaming Linear Regression,Streaming KMeans等等)可以从steaming流数据中学习并在将模型应用在streaming流数据上。除此之外,对于机器学习算法更大的类,你可以离线学习一个学习模型(即使用历史数据),然后将线上模型应用于streaming流数据。详情参阅MLlib指南。
3.9 Caching/Persistence 缓存/持久化
类似于RDD,DStream也允许开发者在内存中持久化stream流数据。也就是说,在DStream使用persist()方法将会自动在内存中持久化DStream的每个RDD。如果在DStream中的数据将被多次计算(例如,对同一数据进行多次操作),这将非常有用。对于像reduceByWindow和reduceByKeyAndWindow这样的基于窗口操作和像updateStateByKey这样的基于状态操作,这无疑问都是有用的。因此,生成基于窗口操作的DStreams是在内存中自动持久化的,没有通过开发者调用persist()。
对于通过网络接收数据的输入流(如Kafka,Flume,sockets等等),默认的持久化级别设置为将数据复制到两个节点以实现容错。
注意,不像RDD,DStreams默认的持久化级别是在内存中保持数据序列化。这可以在Performance Tuning节点进一步讨论。更多关于不同持久化级别的详情可以在Spark Programming Guide找到。
3.10 Checkpointing 检查点
一个streaming流应用程序必须全天运行,因此必须对于应用程序逻辑无关的故障(例如系统故障,JVM崩溃等)具有恢复能力。为了实现这个可能性,Spark Streaming需要checkpoint检查点足够的信息到一个容错存储系统,以便它可以从故障中恢复。这里有两种检查的数据类型。
- Metadata checkpointing元数据检查点 —— 将定义流式计算的信息保存到容错存储系统如HDFS。这用于从运行streaming流应用程序的驱动程序的节点上的故障中恢复(稍候详细讨论)。元数据包含:
- 配置—— 用于创建streaming流应用程序的配置。
- DStream 操作 —— 定义streaming流应用程序的DStream操作集合。
- 没完成的批处理 ——批处理的jobs在排队但尚未完成。
- Data checkpointing 数据检查点—— 将生成RDD保存在可靠的存储中。这在将多个批次的数据组合在一起的有状态转换中是必须的。在这样的转换中,生成的RDD依赖于之前批次的RDD,导致依赖链的长度随着时间的推移而不断增加。为了避免恢复时间的这种无限增长(与依赖链成比例),有状态转换的中间RDD被周期性地检查为可靠存储(如HDFS)以切断依赖链。
总而言之,metadata元数据checkpointing主要用于从驱动程序故障中恢复,而数据或RDD checkpointing对于使用有状态转换的基本功能是必须的。
3.10.1 When to enable Checkpointing 什么时候启用checkpointing
必须为具有以下任何要求的应用程序启用Checkpointing:
- 有状态转换的用法 —— 如果在应用程序中使用updateStateByKey或者reduceByKeyAndWindow(带有反转函数),那么必须提供checkpointing目录以允许定期RDD checkpointing。
- 从运行应用程序的驱动程序故障中恢复 —— Metadata元数据checkpoints用于恢复进度信息。
注意没有上述状态转换的简单streaming流应用程序可以不通过启用checkpointing运行。从驱动程序故障中恢复也是这种情况下的一部分(一些接收到但没有处理的数据可能丢失)。这通常是可以接受的,很多人以这种方式运行Spark Streaming应用程序。
3.10.2 How to configure Checkpointing 如何配置Checkpointing
Checkpointing可以通过在容错,可靠的文件系统(如HDFS、S3等)中设置一个目录启用,checkpoint信息将被保存在该文件系统中。这是通过使用streamingContext.checkpoint(checkpointDirectory)来完成的。这将允许你使用上述提及的状态转换。另外,如果你想要应用程序从驱动程序的故障中恢复,你应该重写你的streaming流应用程序以使其具有下述行为:
- 当程序第一次启动时,它将会创建一个新的StreamingContext,设置所有streams流,然后调用start()。
- 当程序在失败之后重新启动时,它将会根据checkpoint目录中的checkpoint数据重新创建一个StreamingContext。
这个行为通过使用StreamingContext.getOrCreate变得简单。用法如下:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
} // Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ... // Start the context
context.start()
context.awaitTermination()
如果checkpointDirectory存在,那么context上下文将会根据checkpoint数据重新创建。如果该目录不存在(即第一次运行),那么函数functionToCreateContext将会被调用去创建新的context上下文,并设置DStreams。参阅Scala示例RecoverableNetworkWordCount。这个例子把网络数据的单词数量添加到一个文件中。
除了使用getOrCreate之外,你也需要确保驱动程序进程失败后自动重新启动。这个只能通过部署用于运行应用程序的基础设施来完成。这个在部署节点进一步讨论。
注意RDD的checkpointing导致了存储到可靠存储的成本。这个可能导致RDD checkpoint的批次处理时间延长。因此checkpointing的时间间隔需要小心设置。在小批量(例如1秒)的情况下,每个批次的checkpointing可能会显著降低操作吞吐量。相反,checkpointing国语频繁导致lineage和任务规模增长,这可能有不利影响。
对于要求RDD checkpointing的状态转换,默认时间间隔是至少10秒的批处理间隔的倍数。它可以通过使用dstream.checkpoint(checkpointInterval)来设置。通常情况下,一个DStream的5到10个滑动间隔的checkpoint间隔设置是一个很好的尝试。
3.11 Accumulators, Broadcase Variables, and Checkpoints 累加器,广播变量和Checkpoints
累加器和广播变量在Spark Streaming中不能从checkpoint恢复。如果你启用checkpointing并使用累加器或者广播变量,你将必须为累加器和广播变量延迟创建实例化的单例实例,以便在驱动程序由于故障重新启动后它们可以被重新实例化。下面例子可以参阅:
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
} object DroppedWordsCounter { @volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
instance
}
} wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})
完整源代码查看source code.
3.12 Deploying Applications 部署应用程序
这个节点讨论部署一个Spark Streaming应用程序的步骤。
3.12.1 Requirements 要求
为了运行一个Spark Streaming应用程序,你需要有下述内容:
带有集群管理器的集群 —— 这是任何Spark应用程序的普遍要求,详情在deployment guide节点讨论。
- 打包应用程序JAR —— 你必须将你的streaming流式应用程序编译为JAR。如果你正在使用spark-submit去启动应用程序,那么你将不必要在JAR中提供Spark和Spark Streaming。然而,如果你的应用程序使用高级advanced sources(例如Kafka、Flume),那么你将必须在用于部署该应用程序的JAR中打包其链接的第三方artifact工件以及它们的依赖包。例如,一个使用KafkaUtils的应用程序必须在应用程序JAR中包含有spark-streaming-kafka-0-8_2.11及其所有依赖项。
- 为executors配置足够内存 —— 因为接收到的数据必须存储在内存中,executors必须配置足够的内存去持有接收到的数据。注意如果你执行10分钟的窗口操作,系统必须在内存中保留至少10分钟的数据。因此应用程序的内存要求取决于在其中使用的操作。
- 配置checkpointing —— 如果流应用程序要求,那么必须将在Hadoop API兼容的容错存储(例如HDFS,S3等)中的目录配置为checkpoint目录。详情参阅checkpointing节点。
- 配置应用驱动程序的自动重启 —— 为了从驱动程序故障中自动恢复,用于运行streaming流应用程序的部署基础设施必须监控驱动程序进程并当它故障时重新启动。不同集群管理有不同工具去实现它。
- Spark Standalone Spark单例—— 在Spark单例集群中提交Spark应用驱动程序并运行(请参阅集群部署模式),即,应用驱动程序本身在worker节点中的一个上运行。此外,单例集群管理器可以被指示监控驱动程序,并且如果驱动程序由于非零退出代码或者由于运行驱动程序的节点的故障而失败,则重新启动它。详情参阅Spark Standalone指南中的集群模式和监控。
- YARN —— Yarn 支持类似的机制来自动重启应用程序。详情参阅YARN文档。
- Mesos —— Marathon已经被用来与Mesos实现这个功能。
- 配置预写日志 —— 自从Spark 1.2,我们引入了预写日志用于实现强大的容错保证。如果启用,所有从接收器接收到的数据都可以写入配置好的checkpoint目录中的预写日志。这可以防止驱动程序恢复时的数据丢失,从而确保零数据丢失(详情参阅容错语义节点)。这可以通过设置配置属性spark.streaming.receiver.writeAheadLog.enable为true来启用。然而,这些更强的语义semantics可能是以单个接收器的接收吞吐量为代价的。这可以通过并行运行更多的接收器去提高总吞吐量来纠正。此外,建议当预写日志启用时,禁用Spark中接收数据的复制,因为日志已经存储在复制存储系统中。这可以通过设置输入流的存储级别为StorageLevel.MEMORY_AND_DISK_SER来完成的。当使用S3(或者任何不支持刷新的文件系统)写预写日志时,请记住启用spark.streaming.driver.writeAheadLog.closeFileAfterWriter和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。详情参阅Spark Streaming Configuration。注意当启用I/O加密时,Spark不会加密写入预写日志中的数据。如果期望预写日志中的数据加密,那么它存储的文件系统应该支持本地加密。
- 设置最大接收率 —— 如果集群资源不够大,流式应用程序无法像接收数据一样快速处理数据,接收器的接收速率可以通过以records/sec为单位设置最大速率限制来进行限制。详情参阅配置参数接收器的spark.streaming.receiver.maxRate和Direct Kafka approach的spark.streaming.kafka.maxRatePerPartition。在Spark 1.5,我们引入了一个叫backpressure的特性,无需设置速率限制,因为Spark Streaming会自动计算速率限制,并在处理条件发生变化时动态调整速率限制。这个backpressure可以通过设置配置参数spark.streaming.backpressure.enabled为true来启用。
3.13 Upgrading Application Code 升级应用程序代码
如果正在运行的Spark Streaming应用程序需要升级新的应用程序代码,那么这里有两种可能的机制。
升级的Spark Streaming应用程序启动并与现有的应用程序是并行运行的。一旦新的(正在接收相同的数据与旧的一样)已经预热并准备好迎接黄金时段,旧的就可以被取消。注意这可以为支持将数据发送到两个目标(即早期和已已升级的应用程序)的数据源完成。
现有的应用程序正常关闭(请参阅StreamingContext.stop(...)或JavaStreamingContext.stop(...)以获取正常关闭选项),以确保已接收到的数据在关闭之前完全处理完毕。然后可以启动升级的应用程序,它将会从早期应用程序中断的同一点开始处理。注意这只能通过支持source-side源端缓存(如Kafka和Flume)的输入源来完成,因为当前一应用程序关闭而升级的应用程序还没启动时,需要缓存数据。从升级前的代码的更早checkpoint信息重新启动是不能完成的。checkpoint信息本质上包含序列化的Scala/Java/Python对象并试图用新的修改后的类来反序列化对象可能会导致错误。在这种情况下,可以使用不同的checkpoint目录来启动升级的应用程序,也可以删除以前的checkpoint目录。
3.14 Monitoring Applications 监控应用程序
除了Spark的监控功能外,Spark Streaming还有其他特定功能。当使用StreamingContext时,Spark web UI会显示一个额外的Streaming选项卡,它展示了关于正在运行的接收器(无论接收器是否处于活动状态,接收到的记录数量,接收器错误等等)和已完成批次(批处理次数,队列延迟等等)的统计信息。这可以用于监控streaming流应用程序的进程。
Web UI中的以下两个指标尤为重要:
- 处理时间 —— 每批次数据的处理时间。
- 调度延迟 —— 批次在队列中等待前一批次处理完成的时间。
如果批处理时间一直超过批处理间隔或者排队延迟时间持续增加,则表示当批次生成时系统无法快速地处理它们,并且落后。在这种情况下,考虑降低批处理时间。
Spark Streaming程序的处理也可以使用StreamingListenner接口进行监控,它允许你得到接收器状态和处理时间。注意这是个开发者API并且将来还会改进。
4. Performance Tuning 性能调整
从集群上的Spark Streaming应用程序中获得最佳性能需要进行一些调整。本节点介绍可调整的一些参数和配置,以提高应用程序的性能。在高层次上,你需要考虑两件事:
- 通过有效地利用集群资源来降低每批次数据处理时间。
- 设置正确的批处理大小,以便数据的批处理可以像它们接收时一样快速处理(即,数据处理能跟上获取数据)。
4.1 Reducing the Batch Processing Times 降低批处理时间
Spark中有很多优化可以使每个批次的处理时间最短。详情已在Tuning Guide讨论。本节重点介绍一些最重要的内容。
4.1.1 Level of Parallelism in Data Receiving 数据接收中的并行性水平
通过网络接收数据(如Kafka,Flume,socket等等)要求将数据反序列化并存储在Spark中。如果数据接收成为系统中的瓶颈,那么考虑并行化数据接收。注意每个输入DStream创建一个简单接收器(在一台worker机器上运行)去接收单一数据流。通过创建多个输入DStreams并配置它们以接收来自source源的不同分区的数据流来完成接收多个数据流。例如,接收两个topics数据的单一Kafka输入DStream可以被分割成两个Kafka输入流,每个只接收一个topic。这会运行两个接收器,它们允许数据并行接收,因而提高整体的吞吐量。这些多个DStreams可以结合在一起创建一个DStream。那么应用于单一输入DStream的转换也可以应用于统一流上。步骤如下:
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
另一个应该考虑的参数是接收器的块间隔,它是通过配置参数spark.streaming.blockInterval来决定的。对于大多数接收器,接收的数据在存储到Spark内存之前被合并成数据块。每个批处理中块的数量决定了将要用于在类似map的转换处理接收到的数据的任务数量。每个接收器每个批次任务的数量将近似(批间隔/块间隔)。例如,200ms的块间隔将每2秒批次创建10个任务。如果任务的数量太少(即少于每台机器内核数量),那么它将是效率低的,因为全部可用的内核没有用于处理数据。为给定的批次间隔提高任务的数量,降低块间隔。但是,推荐块间隔最小值大概为50ms,低于该值时,任务启动开销可能会成为问题。用多个输入流/接收器接收数据的另一种方法是显式地重新分配输入数据流(使用inputStream.repartition(<分区数>))。在进一步处理之前,这将在集群中指定的数量的机器上分配接收到的批量数据。
4.1.2 Level of Parallelism in Data Processing 数据处理中的并行水平
如果在计算的任何阶段使用使用的并行任务不够高,则集群资源可能未被充分利用。例如,对于分配的reduce操作,如reduceByKey和reduceByKeyAndWindow,默认并行任务的数量是通过spark.default.parallelism配置属性控制的。你可以将并行级别作为参数传递(参阅PairDStreamFunctions文档),或设置spark.default.parallelism配置属性以更改默认值。
4.1.3 Data Serialization 数据序列化
数据序列化的开销可以通过调整序列化格式来减少。在streaming流的这种情况下,有两种数据类型需要被序列化。
- 输入数据:默认的,通过接收器接收到的输入数据通过StorageLevel.MEMORY_AND_DISK_SER_2存储在executors的内存中。即,将数据序列化为字节以降低GC开销,并复制用于容忍执行程序失败。此外,数据首先保存在内存中,并且只有当内存不足以保存流式计算所需的所有输入数据时才会溢出到磁盘。这些序列化有明显的开销 —— 接收器必须反序列化接收到的数据,并使用Spark的序列化格式重新序列化它。
- Persisted RDDs generated by Streaming Operations: 通过流式计算生成的RDDs可能在内存中持久化。例如,窗口操作在内存中持久化数据因为它们会被处理多次。然而,与StorageLevel.MEMORY_ONLY的Spark Core默认值不同,通过流式计算生成的持久化RDDs默认通过StroageLevel.MEMORY_ONLY_SER(即序列化)持久化以最大程度地降低GC开销。
在这两种情况下,使用Kryo序列化可以降低CPU和内存的开销。详情参阅Spark Tuning Guide。对于Kryo,考虑注册自定义类,并禁用对象引用跟踪(参阅配置指南中的与Kryo相关的配置)。在特定情况下,需要为流应用程序保留的数据量不大,可以将数据(两种类型)作为反序列化的对象保持,而不会导致过多的GC开销。例如,如果你正在使用几秒的批处理间隔并且没有窗口操作,那么你可以尝试通过显式设置存储级别来禁用持久数据中的序列化。这可以减少由于序列化造成的CPU开销,可能在不增加太多GC开销的情况下提高性能。
4.1.4 Task Launching Overheads 任务启动开销
如果每秒任务启动的数量很高的话(比如说每秒50或更多),那么发送任务到集群上的开销可能就很大并且很难达到亚秒级的等待时间。开销可以通过下述改变来降低:
- Execution mode:在单例模式或者coarse-grained Mesos模式上运行Spark造成任务启动时间比fine-grained Mesos模式快。详情参阅Running on Mesos guide。
这些更改可能会使批处理时间减少100毫秒,从而运行亚秒级批处理大小可行。
4.2 Setting the Right Batch Interval 设置正确的批处理间隔
对于运行在集群上Spark Streaming应用程序要稳定,系统应该有能力像接收数据一样快速地处理数据。换一句话说,批量数据应该像它们生成一样快速地处理。通过监控streaming web UI中的处理时间,其中批处理时间应该小于批处理间隔的处理时间,可以知道应用程序是否正确。
根据流式计算的性质,使用的批处理间隔可能会对一组固定的集群资源上的应用程序可以维持的数据速率产生重大的影响。例如,我们考虑前面的WordCountNetwork例子。对于特定的数据速率,系统可以保持每2秒(即2秒的间隔时间)汇报一次单词字数,但不是每500毫秒。因此批处理间隔需要设置为生成环境可以维持的期待中的数据速率。
一个好的途径去为你的应用程序找出正确的批处理大小是用一个保守的批处理间隔(比如5-10秒)和低数据速率进行测试。为了验证系统是否有能力维持数据速率,你可以校验通过每个处理过的批次所经历的端到端end-to-end延迟的值(要么在Spark驱动程序log4j日志中查找“总延迟”,要么使用StreamingListener接口)。如果延迟保持与批量大小相当,那么系统是稳定的。否则,如果延迟是持续增加,这意味着系统不能跟上,因而它是不稳定的。一旦你有了稳定配置的主意,你可以试图提高数据速率或者降低批量大小。注意,只要延迟降低到低值(即,小于批量大小),由于临时数据速率增加而引起的延迟的暂时增加就可以是正常的。
4.3 Memory Tuning 内存调整
详情参阅Tuning Guide中关于调整Spark应用程序的内存使用情况和GC行为。强烈要求你参阅。在本节点,我们在Spark Streaming应用程序讨论一些特定的调整参数。
Spark Streaming应用程序要求的集群内存总量严重决定于使用的转换类型。例如,如果你想要在最后10分钟的数据中使用窗口操作,那么你的集群应该有足够的内存来存储10分钟内存中的数据。或者如果你想要将updateStateByKey用于大量的keys,那么必要的内存将会很高。相反,如果你想要执行简单的map-filter-store操作,那么需要的内存很低。
普遍来说,因为通过接收器接收到的数据是使用StorageLevel.MEMORY_AND_DISK_SER_2来存储,在内存中不适合的数据将会溢出到磁盘。这将会降低streaming流应用程序的性能,因此建议当你的streaming流应用程序需要时提供足够的内存。最好尝试一下小规模的内存使用情况并做相应的估计。
内存调整的另一种方法是垃圾回收。对于一个要求低延迟的streaming流应用程序,通过JVM垃圾回收造成大量的暂停,这是不希望的。
这里有几个参数可以帮助你调整内存使用情况和GC开销:
- Persistence Level of DStreams : 在Data Serialization节点提及,输入数据和RDDs默认是作为序列化字节保存的。与反序列化持久化保存相比,这降低内存使用和GC开销。启用Kryo序列化进一步降低序列化大小和内存使用。进一步降低内存的使用可以通过压缩来实现(参阅Spark配置spark.rdd.compress),代价是CPU时间。
- Clearing old data: 默认,通过DStream转换生成的所有的输入数据和持久化的RDD自动被清除。Spark Streaming决定什么时候清除基于使用的转换的数据。例如,如果你正在使用10分钟的窗口操作,那么Spark Streaming将会存储大约最后10分钟的数据,并且积极清除旧数据。通过设置streamingContext.remember,数据可以保留一个更长的时间(例如交互式查询旧数据)。
- CMS Garbage Collector: 强烈建议使用并发的mark-and-sweep GC,以保持GC相关的暂停时间始终低。即使并发GC对于降低系统整体处理吞吐量已知,但仍然推荐使用并行GC来实现更一致的批处理时间。确保你在driver端(在spark-submit使用--driver-java-options)和executors端(使用spark配置spark.executor.extraJavaOptions)设置CMS GC。
- Other tips:为了进一步降低GC开销,这里有更多的技巧可以尝试。
- 使用OFF_HEAP存储级别来持久化RDDs。详情参阅Spark Programming Guide。
- 通过更小的heap堆大小使用更多的executors。
4.3.1 Important points to remember:
- DStream与单个接收器相关联。为了达到并行读取多个接收器,即创建多个DStream。在一个executor中运行一个接收器。它占用一个内核。确保在预订接收器后有足够的内核进行处理,即spark.cores.max应考虑接收器花费的耗时。接收器以循环方式分配给executors。
- 当从流来源接收到的数据,接收器创建数据块。每隔blockInterval毫秒生成一个新的数据块。在batchInterval期间创建N个数据块,N = batchInterval/blockInterval。这些块被当前executor的块管理器分配给其他executors的块管理器。这之后,运行在driver上的网络输入跟踪器将被通知有关块位置以供进一步处理。
- 在driver上为在batchInterval期间创建的块创建一个RDD。在batchInterval期间生成的块是RDD的分区。在spark中每个分区是一个任务。blockInterval==batchInterval意味着创建一个单独的分区,并且可能在本地处理。
- 除非是非本地调度,否则在块中的map任务在executors中被处理(接收到的块和复制块的另一块),而不管块的间隔是多少。更大的blockInterval意味着更大的块。spark.locality.wait的高值增加了在本地节点上处理块的机会。在这两个参数之间找到一个平衡去确保本地处理更大的块。
- 替代依赖于batchInterval和blockInterval,你可以通过调用inputDstream.repartition(n)来定义分区的数量。这随机地重新刷新RDD中的数据以创建n个分区。是的,为了更大的并行性。虽然是以shuffle为代价的。driver的工作调度程序将RDD的处理作为一项工作。在特定的时间点,只有一个工作是活跃的。所以,如果一个工作正在执行,另一个工作正在排队。
- 如果你有两个dstreams,将会有两个RDD形成,并且将会创建两个作业,这两个作业将会被一个接一个地调度。为了避免这种情况,你可以结合两个dstreams。这将确保dstreams的两个RDD形成一个单一的unionRDD。然后这个unionRDD被认为是一个单一的作业。但是RDD的分区不受影响。
- 如果批处理时间超过batchInterval,那么显然接收器的内存将会开始填满,最终将会在抛出异常(最可能是BlockNotFoundException)。目前没有办法暂停接收器。使用SparkConf配置spark.streaming.receiver.maxRate,可以限制接收器的速率。
5. Fault-tolerance Semantics 容错语义
在这节点,我们将会讨论Spark Streaming应用程序在发生故障时的行为。
5.1 Background 背景
为了明白Spark Streaming提供的语义,让我们记住基本Spark RDD的容错语义。
1. RDD是一个不可改变的,确定性可以重新计算的分布式数据集。每个RDD都记住在容错输入数据集上使用的确定性操作的lineage来创建它。
2. 如果RDD任何的分区由于workder节点故障而丢失,那么该分区可以通过使用lineage操作从原来的容错数据集重新计算。
3. 假设所有RDD转换都是确定性的,最终转换的RDD中的数据总是相同的,而不管Spark集群中的故障如何。
在容错文件系统(如HDFS或者S3)中Spark在数据上的操作。因此,从容错数据生成的全部RDD也是容错的。然而,对于Spark Streaming,情况并非如此,因为在大多数情况下通过网络接收数据(除了使用fileStream时)。为了实现所有生成的RDD相同的容错属性,接收到的数据在集群中的worker节点里的多个Spark executors之间复制(默认的复制因子为2)。这导致系统中有两种数据在发生故障时需要恢复:
1.Data received and replicated 数据接收和复制 —— 这些数据在单个worker节点的故障中仍然存在,因为它副本存在于其他节点之一上。
2.Data received but buffered for replication 数据接收但缓冲复制 —— 因为这不是复制的,唯一的方法来恢复这个数据是从来源source再次得到它。
进一步,这里有两种我们应该关注的故障:
1.Worker节点的故障 —— 运行executors的任何worker节点可以失败,并且这些节点的所有内存数据将会丢失。如果任何接收器运行在故障的节点上,那么它们缓冲的数据将会丢失。
2.Driver节点的故障 —— 如果运行Spark Streaming应用程序的driver节点失败,那么显然SparkContext会丢失,并且所有具有内存数据的executors都会丢失。
带有基本的知识,让我们了解Spark Streaming的容错语义。
5.2 Definitions 定义
streamings流系统的语义经常是通过系统处理每条记录的次数来获取的。系统可以在所有可能的操作条件下提供三种类型的保证(尽管失败等)。
1. 最多一次:每条记录将会被处理一次或者根本不处理。
2. 至少一次:每条记录将会被处理一次或多次。这比最多一次强壮,因为它确保没有数据会丢失。但是可能有重复。
3. 正好一次:每条记录将会正好被处理一次 —— 没有数据会丢失也没有数据会被执行多次。这显然是三个中最强壮的保证。
5.3 Basic Semantics 基本语义
在任何流处理系统中,广义来讲,处理数据有三个步骤。
1.Receiving the data接收数据:使用接收器或者其他从来源接收数据。
2.Transforming the data转换数据:使用DStream和RDD转换变换接收到的数据。
3.Pushing out the data推送数据:最终转换的数据被推送到外部系统,如文件系统、数据库、仪表盘等等。
如果流应用程序必须实现端到端正好一次的保证,那么每一步骤必须提供正好一次的保证。也就是说,每条记录必须实现只能被接收一次,只能被转换一次,并只能被推送到下流系统一次。
在Spark Streaming上下文中这些步骤的语义:
1. Receiving the data:不同输入来源提供不同的保证。详情参阅下一小节。
2. Transforming the data:由于RDD提供的保证,已经接收到的所有数据将只会被处理一次。即使出现故障,只要接收到的输入数据是可访问的,那么最终转换的RDD将总会有相同的内容。
3. Pusing out the data:输出操作默认确保至少一次语义,因为它依赖于输出操作的类型和下流系统的语义(支持事务与否)。
5.4 Semantics of Received Data 接收数据的语义
不同输入来源提供不同的保证,范围从至少一次到正好一次。阅读更多的细节。
5.4.1 With Files
如果所有的输入数据已经在容错文件系统如HDFS出现,Spark Streaming也可以从任何故障中恢复并处理所有数据。这给予正好一次语义,意味着所有数据将只能被处理一次无论发生什么故障。
5.4.2 With Receiver-based Sources
对应基于接收器的输入来源,容错语义取决于故障情况和接收器的类型。正如我们前面所讨论的,有两种类型的接收器:
1. Reliable Receiver 可靠的接收器 —— 这些接收器只有在确保接收到的数据已被复制之后才确认可靠的来源。如果这个接收器失败,来源将不会接收到缓冲数据(未复制)的确认。因此,如果接收器重新启用,来源将会再次发送数据,将不会有数据会由于故障而丢失。
2. Unreliable Receiver 不可靠的接收器 —— 这样的接收器不会发送确认,因此当由于worker或者driver发生故障时可能会丢失数据。
取决于使用什么类型的接收器,我们实现下面的语义。如果worker节点故障,那么可靠的接收器就不会有数据丢失。对于不可靠的接收器,接收到的数据但没有复制可能会丢失。如果driver节点故障,那么除了这些损失之外,所有在内存中接收和复制的过去数据都将丢失。这将会影响有状态转换的结果。
为了避免过去接收到的数据丢失,Spark1.2引入了预写日志,它在容错存储中保存接收到的数据。通过启用预写日志和可靠的接收器,不会有数据丢失。就语义而言,它至少提供一次保证。
下列表格总结了失败的语义:
Deployment Scenario | Worker Failure | Driver Failure |
---|---|---|
Spark 1.1 or earlier, OR Spark 1.2 or later without write ahead logs | 不可靠的接收器会丢失缓存的数据 可靠的接收器零数据丢失 至少一次语义 | 不可靠的接收器丢失缓存数据 所有接收器会丢失过去的数据 没有定义语义 |
Spark 1.2 or later with write ahead logs | 可靠的接收器零数据丢失 至少一次语义 | 可靠的接收器和文件零数据丢失 至少一次语义 |
5.4.3 With Kafka Direct API
在Spark1.3,我们已经引入了一个新的Kafka Direct API,这可以确保所有Kafka数据正好被Spark Streaming一次接收到。除此之外,如果你实现正好一次的输出操作,你可实现端到端的正好一次保证。 Kafka Integration Guide.进一步讨论了这种方法。
5.4.4 Semantics of output operations 输出操作语义
输出操作(如foreachRDD)有至少一次语义,也就是说,转换的数据可能在worker故障的情况下多次地被写入外部实体。当使用saveAs***Files操作将文件保存到文件系统是可以接受的(因为文件只会被相同的数据覆盖),可能需要额外的努力才能实现一次语义。这里有两种方法。
- Idempotent updates 幂等更新:多个尝试总是写入相同的数据。例如,saveAs***Files总是将相同的数据写入到生成的文件。
- Transactional updates 事务更新:所有更新都是以事务方式进行的,因此更新只能以原子方式进行一次。一个途径执行它是这样的。
- 使用批处理时间(在foreachRDD中可用)和RDD分区索引创建一个标识符。该标识符唯一标识流应用程序中的blob数据。
- 使用这个标识符事务性地(即只是一次,原子地)用这个blob更新外部系统。也就是说,如果标识符还没提交,则以原子方式提交提交分区数据和标识符。否则,如果这已经提交,则跳过更新。
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}