Introduction to Core Spark Concepts
- driver program:
- 在集群上启动一系列的并行操作
- 包含应用的main函数,定义集群上的分布式数据集,操作数据集
- 通过SparkContext对象访问spark,这表示了与计算集群的连接
- executors:
- the place to run the operations
- Spark automatically takes ur function and ships it to executor nodes.
Programming with RDDs
- RDD: spark's core abstraction for working with data.
- RDD简单来说就是元素的分布式集合
- 在Spark中所有的工作都可以表示成创建一个新的RDDs,转换已有的RDDs,或者是在RDDs上运行operations
RDD Basics
- An immutable distributed collection of objects.
- 每个RDD被split成多个partitions,每个partition可能在cluster的不同节点上被计算
- RDD的创建:
- loading一个外部数据集
- distributing对象集合(eg: a list or set)
- transformations:从原RDD创建一个新的RDD
- actions:基于RDD计算一个result,这个结果要么返回给driver program,要么存储到外部存储系统(eg: HDFS)
- RDD.persist():由于缺省情况下,每次运行action的时候RDDs是重新计算的。如果对RDD进行persist,那么该RDD会persist到内存(或disk),下次action的时候可以reuse。
RDD操作:(区分这两种操作的原因是Spark的计算是lazy fashion的
Creating RDDs
- parallelize()
val lines = sc.parallelize(List("pandas", "I like pandas"))
- textFile()
val lines = sc.textFile("/path/to/README.md")
RDD Operations
- Transformation & Action
Transformations
- Compute lazily
- 没有改变原RDD(immutable),而是生成了新的RDD,spark会保存这一系列依赖关系(lineage)
Actions
- Actually do something with our dataset
Passing Functions to Spark
- Scala: we can pass in functions defined inline, references to methods, or static functions
- Scala: 我们所传送的函数和其中的数据引用需要被序列化(实现Java的Serializable接口)
- 如果我们pass一个对象中的函数,或者包含了对象中的字段的引用(eg: self.field),spark会把整个对象发送给worker nodes,这会远大于你所需要的信息。并且如果你的对象不能持久化(pickle in python)的话,会导致是你的程序失败。举一个python的例子:
错误示范如下:
正确示范:(提取对象中你所需的字段为局部变量,然后传进去)
- 同样的,对scala我们也要尽量避免上述情况,而且要注意的是在scala中不需要显示的self.或者this.,所以这种情况显得很不明显,但仍然要注意。举个栗子
如果在scala中出现了NotSerializableException,那么多半是因为引用了一个不可序列化的类中的变量或字段。所以,传送一个局部的可序列化的变量或函数才是安全的。
- Any code that is shared by RDD transformations must always be serializable.
Common Transformations and Actions
Basic RDDs
- 我们首先介绍基本的RDD操作,它们可以执行在所有RDDs上而不用管数据
Element-wise transformations
- map() and filter()
- flatMap(): 为每一个输入元素产生多个输出元素。返回的是一个迭代器iterator
val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
Psedudo set operations
- 一些简单的集合操作:(需要RDDs是同一类型的)
- RDD1.distinct() --> 十分昂贵的操作,需要shuffle all data over the network
- RDD1.union(RDD2) --> 最简单的集合操作,会保留原RDD中的重复值
- RDD1.intersection(RDD2) --> 需要去重(来识别共同元素),因而也需要shuffle
- RDD1.substract(RDD2) --> perform shuffle
- RDD1.cartesian(RDD2) --> returns all possible pairs of (a, b) where a is in the source RDD and b is in the other RDD .十分昂贵
- 为什么叫psedudo即假的集合操作呢,因为这里的集合丢失了一个重要特性:uniqueness即元素的唯一性。因为我们经常有duplicates
Actions
- reduce() & fold() :都需要返回值和RDD中的元素保持同一类型。
fold()接收与reduce接收的函数签名相同的函数,另外再加上一个初始值作为第一次调用的结果.
val sum = rdd.reduce((x, y) => x + y)
- aggregate(): frees us from the constraint of having the return be the same types as the RDD we are working on.
aggregate的函数原型:
def aggregate [U: ClassTag] (zeroValue: U) (seqOp: (U,T)=>U,combOp: (U,U)=>U):U
可以看到,(zeroValue: U)是给定的一个初值,后半部分有两个函数,seqOp相当于是在各个分区里进行的聚合操作,它支持(U, T) => U,也就是支持不同类型的聚合。comOp是将sepOp后的结果聚合,此时的结果全部是U类,也就是只能进行同构聚合。
一个经典的例子是求平均值。即先用seqOp求出各个分区中的sum和个数,再将seqOp后的结果聚合得到总的sum和总的个数。
- collect(): 返回整个RDD中的内容,常用于单元测试,因为它需要你的整个数据集能够fit on a single machine.
- take(n): 返回RDD中的n个元素,并且试图最小化所访问的partition数,所以它可能会返回一个biased collection。
- takeSample(withReplacement, num, seed): allows us to take a sample of our data either with or without replacement.
- foreach(): 可以允许我们在每个元素上执行操作or计算,而不需要把元素送回driver
Converting Between RDD Types
- 一些functions只在某些特定类型RDD上可用。比如mean(), variance()只用于numericRDDs, join()只用于key/value pair RDDs.
- 在scala和Java中,这些方法未在标准RDD类中定义,因此为了访问这些附加的功能,我们需要确保我们得到了正确的specialized class。
Scala
- 在scala中。RDDs的转换可以通过使用隐式转换(using implicit conversions)来自动进行。
- 看一段RDD.scala源码中的介绍
- 关于scala隐式转换: 当对象调用类中不存在的方法或成员时,编译器会自动将对象进行隐式转换
- 隐式转换带来的confusion:当你在RDD上调用mean()这样的方法时,你会发现在RDDclass 的Scaladocs中找不到mean()方法,但是该方法能成功调用是由于实现了RDD[Double]到DoubleRDDFunctions的隐式转换。
Persistence(Caching)
- As discussed earlier, Spark RDDs是惰性求值的,如果我们想要多次使用同一个RDD的话,Spark通常会每次都重新计算该RDD和它所有的依赖。这对于迭代算法是十分昂贵的。
- 一个比较直观的例子如下,每次action的时候都会重新计算:
- 为了避免多次重复计算同一个RDD,我们可以让Spark来persist数据。这样的话,计算该RDD的那个节点会保存它们的partition。
- 如果有数据持久化的节点fail掉了,Spark会在需要的时候重新计算丢失的partitons。当然我们也可以通过在多个节点保存副本的方式来避免节点故障时的slowdown。
- Spark有很多levels of persistence供选择。
Level Space Used CPU time In Memory On Disk Comments MEMORY_ONLY
High Low Y N MEMORY_ONLY_SER
Low High Y N MEMORY_AND_DISK
High Medium Some Some Spils to disk if there is too much data to fit in memory. MEMORY_AND_DISK_SER
Low High Some Some Spills to disk if there is too much data to fit in memory. Stores serialized representation in memory.
DISK_ONLY
Low High N Y - 在Java和scala中,缺省的情况下persist()回将未序列化的对象数据保存在JVM的堆中。
- 如果你试图在内存中cache过多的数据,Spark将会自动驱逐旧的partitions,使用最少最近使用(Least Recently Used, LRU)缓存策略。对于MEMORY_ONLY level,下次访问的时候会重新计算这些被驱逐的分片。
- 由于Spark'的各种机制,无论使用哪种level,你都可以不用担心job breaking。但是缓存不必要的数据将会导致有用数据被驱逐,从而增加重计算的时间。
- Spark提供了unpersist()方法可以让你手工地将RDD移除缓存。
Off-heap caching is experimental and uses Tachyon. If you are interested in off-heap caching with Spark, take a look at the Running Spark on Tachyon guide.