什么是Spark

Apache Spark是一个开源集群运算框架, 相对于Hadoop的MapReduce会在运行完工作后将中介数据存放到磁盘中,Spark使用了存储器内运算技术,能在数据尚未写入硬盘时即在存储器内分析运算。Spark在存储器内运行程序的运算速度能做到比Hadoop MapReduce的运算速度快上100倍,即便是运行程序于硬盘时,Spark也能快上10倍速度。Spark允许用户将数据加载至集群存储器,并多次对其进行查询,非常适合用于机器学习算法。

为什么需要Spark

在Spark 之前,我们已经有了Hadoop,Hadoop 作为大数据时代企业首选技术,方兴未艾,我们为什么还需要Spark 呢?

Hadoop 对某些工作并不是最优的选择:

中间输出到磁盘,会产生较高的延迟。

缺少对迭代运算的支持。

Spark项目构成要素

Spark核心和弹性分布式数据集(RDDs)

Spark核心是整个项目的基础,提供了分布式任务调度,调度和基本的I/O功能。而其基础的程序抽象则称为弹性分布式数据集(RDDs),是一个可以并型操作、有容错机制的数据集合。 RDDs可以通过引用外部存储系统的数据集创建(例如:共享文件系统、HDFS、HBase或其他 Hadoop 数据格式的数据源)。或者是通过在现有RDDs的转换而创建(比如:map、filter、reduce、join等等)。

RDD抽象化是经由一个以Scala, Java, Python的语言集成API所呈现,简化了编程复杂性,应用程序操纵RDDs的方法类似于操纵本地端的数据集合。

Spark SQL

Spark SQL在Spark核心上带出一种名为SchemaRDD的数据抽象化概念,提供结构化和半结构化数据相关的支持。Spark SQL提供了领域特定语言,可使用Scala、Java或Python来操纵SchemaRDDs。它还支持使用使用命令行界面和ODBC/JDBC服务器操作SQL语言。在Spark 1.3版本,SchemaRDD被重命名为DataFrame

MLlib

MLlib是Spark上分布式机器学习框架。Spark分布式存储器式的架构比Hadoop磁盘式的Apache Mahout快上10倍,扩充性甚至比Vowpal Wabbit要好。 MLlib可使用许多常见的机器学习和统计算法,简化大规模机器学习时间,其中包括:

汇总统计、相关性、分层抽样、假设检定、随机数据生成

分类与回归:支持向量机、回归、线性回归、决策树、朴素贝叶斯

协同过滤:ALS

分群:k-平均算法

维度缩减:奇异值分解(SVD),主成分分析(PCA)

特征提取和转换:TF-IDF、Word2Vec、StandardScaler

最优化:随机梯度下降法(SGD)、L-BFGS

Spark是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存储层,可融入Hadoop的生态系统,以弥补缺失MapReduce的不足

Spark相比Hadoop MapReduce的优势如下:

  1. 中间结果输出

    基于MapReduce的计算引擎通常会将中间结果输出到磁盘上,进行存储和容错。出于任务管道承接的考虑,当一些查询翻译到MapReduce任务时,往往会产生多个Stage,而

    这些串联的Stage又依赖于底层文件系统(如HDFS)来存储每一个Stage的输出结果。

    Spark将执行模型抽象为通用的有向无环图执行计划(DAG),这可以将多Stage的任务串联或者并行执行,而无须将Stage中间结果输出到HDFS中。类似的引擎包括Dr yad、Tez。
  2. 数据格式和内存布局

    Spark抽象出分布式内存存储结构弹性分布式数据集RDD,进行数据的存储。RDD能支持粗粒度写操作,但对于读取操作,RDD可以精确到每条记录,这使得RDD可以用来作为分布式索引

    Spark的特性是能够控制数据在不同节点上的分区,用户可以自定义分区策略,如Hash分区等.Shark和Spark SQL在Spark的基础上实现了列存储和列存储压缩
  3. 执行策略

    Spark任务在shuffle中不是所有情景都需要排序,所以支持基于Hash的分布式聚合,调度中采用更为通用的任务执行计划图(DAG),每一轮次的输出结果在内存缓存
  4. 任务调度的开销

    传统的MapReduce系统,是为了运行长达数小时的批量作业而设计的,在某些极端的情况下,提交一个任务的延迟非常高

    Spark采用了事件驱动的类库AKKA来启动任务,通过线程池复用线程来避免进程或线程启动和切换开销

Spark的"HelloWorld"

Spark 提出了一种分布式的数据抽象,称为 RDDs(resilient distributed datasets,弹性分布式数据集),是一个可并行处理且支持容错的数据集,同时,也是一个受限的数据集,RDDs是一个只读的、记录分区的数据集,仅支持transformationaction两种操作,这些受限,使得RDDs可以以较小的成本实现高容错性、可靠性。

RDDs有两种创建方式,一种是从外部数据源创建,另一种是从其它RDDs transform而来。transformation 是对RDDs进行确定性的操作,输入是RDDs,输出RDDs。action 是向应用程序返回值或者将结果写到外部存储。

最后,transformation具有 LAZY 的特点,当在RDDs上进行一次transformation时,并不会立即执行,只会在进行action时,前面的transformation才会真正执行。这个特点,被 Spark 用来优化整个工作链路,可以有效减少网络沟通、传输时间(大数据处理过程中,网络传输可以说是最大的性能杀手),从而大幅提高运行速度。

举个例子,我们具有如下代码:

lines = spark.textFile("hdfs://...")                                  // 第一行,读取外部数据源,生成一个RDDs;
errors = lines.filter(_.startsWith("ERROR")) // 第二行,在RDDs lines上做了一次transformation运算 filter,取出以”ERROR” 开头的所有行,得到一个新的RDDs errors;
errors.cache() // 第三行,缓存RDDs;
errors.count() // 第四行,在errors 上执行action,得到errors的行数。

整个过程中,只有在执行count()时,才会真正开始读取数据、过滤、缓存、计算行数。

上述整个过程,称为lineage,根据lineage,可以从具体的物理数据,计算出相应的结果。在Spark中,实现容错就是根据 lineage,当某个分区失败后,重新进行一次计算即可,而不是采用检查点、回滚等代价高昂的方式。同时,lineage 是Spark用来优化计算流程的依据。

可以参考此处

05-08 15:46