Spark高效数据分析04、RDD创建-LMLPHP

Spark高效数据分析04、RDD创建


RDD产生背景

RDD产生的目的是为了解决开发人员能在大规模的集群中以一种容错的方式进行内存计算,而当前的很多框架对迭代式算法场景与交互性数据挖掘场景的处理性能非常差, 这个是 RDD 提出的动机
基于 MR 的数据迭代处理流程和基于 Spark 的数据迭代处理流程如图所示
 

RDD 的概念

RDD 的弹性

Demo-对list进行操作

package com.item.action

import org.apache.spark.{SparkConf, SparkContext}

object Demo7 {
  def main(args: Array[String]): Unit = {
    var conf =new SparkConf().setAppName("demo").setMaster("local")

    var sc =new SparkContext(conf)
    val rdd = sc.parallelize(List(2,8,6,3,3,7,9,5))

    rdd.distinct().foreach(i=>println(i+"-"))

    rdd.sortBy(x=>x,false).foreach(i=>println(i+"-"))

    rdd.filter(_>3).foreach(i=>println(i+"-"))

    rdd.map(_*2).foreach(i=>println(i+"-"))

  }
}

Demo-对单词数量进行分析

分析数据:

package com.item.action

import org.apache.spark.{SparkConf, SparkContext}

object Demo1 {
  def main(args: Array[String]): Unit = {
    //直接解压到桌面
    val filepath ="C:\\Users\\Administrator\\Desktop\\计应 spark机试考试素材\\计应 spark机试考试素材\\数据/spark1.txt"
    //设置配置文件·app名称以及【local本地文件读取】
    val sparkConf = new SparkConf().setAppName("demo1").setMaster("local")
    //程序的入口
    val sc = new SparkContext(sparkConf)
    //读取文件
    val strfile = sc.textFile(filepath)
    //去除首行
    var firstRow=sc.textFile(filepath).first()
    //将数据进行分割,并筛选出包含有A的数据
    val wordes = strfile.filter(!_.equals(firstRow)).flatMap(_.split("\t")).filter(_.contains("A"))
    //每个a累计一次
    val wordone = wordes.map(a=>(a,1))
    // 前面一个下划线表示累加数据,后面一个下划线表示新数据
    val result = wordone.reduceByKey(_+_)
    //输出位置
    result.saveAsTextFile("D://demo/demo1")
  }
}
07-20 18:50