在windows上实现wordcount单词统计
一、编写scala程序,引入spark类库,完成wordcount
1.sparkcontextAPI
sparkcontext是spark功能的主要入口点,代表着到spark集群的连接,可用于在这些集群上创建RDD(弹性分布式数据集),累加器和广播变量。在每一个JVM上面只允许一个活跃的sparkcontext。在创建一个新的RDD之前,你应该停止这个活跃的SparkContext
2.sparkconf配置对象
sparkconf是对spark应用的配置,用来设置键值对的各种spark参数。大多数的时候,你需要通过new sparkconf的方式来创建一个对象,会从任何的spark系统属性中记性加载,从这个方面来讲,你在sparkconf上设置的参数会直接影响你在整个系统属性中的优先级
3.scala版单词统计:wordCount
import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount {
def main(args: Array[String]): Unit = {
//创建spark配置对象
val conf = new SparkConf();
//设置app名字
conf.setAppName("WordConf")
//创建master
conf.setMaster("local");
//创建spark上下文对象
val sc = new SparkContext(conf);
//加载文本文件
val rdd1 = sc.textFile("E:\\studyFile\\data\\test.txt")
//对rdd1中的对象压扁
val rdd2 = rdd1.flatMap(line=>line.split(" "))
//映射w=>(w,1)
val rdd3 = rdd2.map((_,))
val rdd4 = rdd3.reduceByKey(_ + _)
val r= rdd4.collect()
//遍历打印
r.foreach(println)
}
}
3.java版单词统计:wordCount
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; public class WordCountJava2 {
//创建conf对象
public static void main(String[] args){
SparkConf conf = new SparkConf();
conf.setAppName("WordCountJava2");
conf.setMaster("local");
//创建java版的sparkContext上下文对象
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> rdd1=sc.textFile("E:/studyFile/data/test.txt");
//先将单词压扁
JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
//迭代的方法
public Iterator<String> call(String s) throws Exception {
List<String> list = new ArrayList<String>();
String[] arr = s.split(" ");
for(String ss:arr){
list.add(ss);
}
return list.iterator();
}
});
//映射,将单词映射为:word===>(word,1)
JavaPairRDD<String,Integer> rdd3=rdd2.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,);
}
});
JavaPairRDD<String,Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
//函数捏合的过程
return v1+v2;
}
});
List<Tuple2<String,Integer>> list=rdd4.collect();
for(Tuple2<String,Integer> t :list){
System.out.println(t._1+":"+t._2);
}
}
}
4.提交作业到完全分布式spark集群上来运行
1)到处jar包
2)spark-submit --master local --name WordCount --class com.jd.spark.scala.WordCountDemoScala spark-daemon1-1.0-SNAPSHOT.jar /home/centos/test.txt
5.提交作业到完全分布式spark集群上来运行(只需要hdfs)
1)需要启动hadoop集群
$>start-dfs.sh
2)put文件到hdfs
hdfs dfs -put test.txt /user/centos/hadoop/
2)spark-submit提交命令运行job
$>spark-submit --master spark://s11:7070 --name WordCount --class com.jd.spark.scala.WordCountDemoScala spark-daemon1-1.0-SNAPSHOT.jar hdfs://s11:8020/user/centos/hadoop/test.txt