使用Scala编写Spark程序求基站下移动用户停留时长TopN

1. 需求:根据手机基站日志计算停留时长的TopN

我们的手机之所以能够实现移动通信,是因为在全国各地有许许多多的基站,只要手机一开机,就会和附近的基站尝试建立连接,而每一次建立连接和断开连接都会被记录到移动运营商的基站服务器的日志中。

虽然我们不知道手机用户所在的具体位置,但是根据基站的位置就可以大致判断手机用户的所处的地理范围,然后商家就可以根据用户的位置信息来做一些推荐广告。

为了便于理解,我们简单模拟了基站上的一些移动用户日志数据,共4个字段:手机号码,时间戳,基站id,连接类型(1表示建立连接,0表示断开连接)

基站A的用户日志(19735E1C66.log文件):

18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1
18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1
18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0
18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0

基站B的用户日志(DDE7970F68.log文件):

18611132889,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1
18688888888,20160327075100,9F36407EAD0629FC166F14DDE7970F68,1
18611132889,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0
18688888888,20160327081300,9F36407EAD0629FC166F14DDE7970F68,0
18688888888,20160327175000,9F36407EAD0629FC166F14DDE7970F68,1
18611132889,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1
18688888888,20160327220000,9F36407EAD0629FC166F14DDE7970F68,0
18611132889,20160327230000,9F36407EAD0629FC166F14DDE7970F68,0

基站C的用户日志(E549D940E0.log文件):

18611132889,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327081900,CC0710CC94ECC657A8561DE549D940E0,0
18611132889,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0
18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,0
18611132889,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1
18611132889,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0

下面是基站表的数据(loc_info.txt文件),共4个字段,分别代表基站id和经纬度以及信号的辐射类型(信号级别,比如2G信号、3G信号和4G信号等):

9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6

基于以上数据,要求计算每个手机号码在每个基站停留时间最长的2个地点(经纬度)。

思路:

(1)读取日志数据,并切分字段;
(2)整理字段,以手机号码和基站id为key,时间为value封装成Tuple;
(3)根据key进行聚合,将时间累加;
(4)将数据以基站id为key,以手机号码和时间为value封装成Tuple,便于后面和基站表进行join;
(5)读取基站数据,并切分字段;
(6)整理字段,以基站id为key,以基站的经度和纬度为value封装到Tuple;
(7)将两个Tuple进行join;
(8)对join后的结果按照手机号码分组;
(9)将分组后的结果转成List,在按照时间排序,在反转,最后取Top2;
(10)将计算结果写入HDFS;

2. 准备测试数据

使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

1) 移动用户的日志信息(即上面的3个log文件):

2) 基站数据(即上面的loc_info.txt文件):

3. pom文件

 <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.xuebusi</groupId>
<artifactId>spark</artifactId>
<version>1.0-SNAPSHOT</version> <properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding> <!-- 这里对jar包版本做集中管理 -->
<scala.version>2.10.6</scala.version>
<spark.version>1.6.2</spark.version>
<hadoop.version>2.6.4</hadoop.version>
</properties> <dependencies>
<dependency>
<!-- scala语言核心包 -->
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<!-- spark核心包 -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency> <dependency>
<!-- hadoop的客户端,用于访问HDFS -->
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies> <build>
<pluginManagement> <plugins>
<!-- scala-maven-plugin:编译scala程序的Maven插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- maven-compiler-plugin:编译java程序的Maven插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<!-- 编译scala程序的Maven插件的一些配置参数 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 编译java程序的Maven插件的一些配置参数 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- maven-shade-plugin:打jar包用的Mavne插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build> </project>

4. 编写Scala程序

在src/main/scala/目录下创建一个名为MobileLocation的Object:

使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

MobileLocation.scala完整代码:

package com.xuebusi.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext} /**
* 计算每个基站下停留时间最长的2个手机号
* Created by SYJ on 2017/1/24.
*/
object MobileLocation { def main(args: Array[String]) {
/**
* 创建SparkConf
*
* 一些说明:
    * 为了便于在IDEA中进行Debug测试,
* 这里就设置为local模式,即在本地运行Spark程序;
* 但是这种方式存在一个问题,如果要从HDFS中读数据,
* 在Windows平台下读取Linux上HDFS中的数据的话,
* 可能会抛异常,因为它在读取数据的时候要用到Windows
* 下的一些本地库;
*
* 在使用Eclipse在Windows上运行MapReduce程序的时候也会遇到
* 该问题,但是在Linux和MacOS操作系统中则不会遇到这种问题.
*
* Hadoop的压缩和解压缩要用到Windows的一些本地库,
* 而这些库是C或者C++编写的,而C和C++编写的库文件是不跨平台的,
* 所以要想在Windows下调试MapReduce程序需要先安装好本地库;
*
* 建议在Windows下安装Linux虚拟机,带有图形界面的,
* 这样调试就不会有问题.
*
*/
//本地运行
val conf: SparkConf = new SparkConf().setAppName("MobileLocation").setMaster("local")
//创建SparkConf,默认以集群方式运行
//val conf: SparkConf = new SparkConf().setAppName("MobileLocation") //创建SparkContext
val sc: SparkContext = new SparkContext(conf) //从文件系统读取数据
val lines: RDD[String] = sc.textFile(args(0)) /**
* 切分数据
* 这里使用了两个map方法,不建议使用这种方式,
* 我们可以在一个map方法中完成
*/
//lines.map(_.split(",")).map(arr => (arr(0), arr(1).toLong, arr(2), arr(3))) //在一个map方法中实现对数据的切分,并组装成元组的形式
val splited = lines.map(line => {
val fields: Array[String] = line.split(",")
val mobile: String = fields(0)
//val time: Long = fields(1).toLong
val lac: String = fields(2)
val tp: String = fields(3)
val time: Long = if(tp == "1") -fields(1).toLong else fields(1).toLong //将字段拼接成元组再返回
//((手机号码, 基站id), 时间)
((mobile, lac), time)
}) //分组聚合
val reduced: RDD[((String, String), Long)] = splited.reduceByKey(_+_) //整理成元组格式,便于下一步和基站表进行join
val lacAndMobieTime = reduced.map(x => {
//(基站id, (手机号码, 时间))
(x._1._2, (x._1._1, x._2))
}) //读取基站数据
val lacInfo: RDD[String] = sc.textFile(args(1)) //切分数据并jion
val splitedLacInfo = lacInfo.map(line => {
val fields: Array[String] = line.split(",")
val id: String = fields(0)//基站id
val x: String = fields(1)//基站经度
val y: String = fields(2)//基站纬度
/**
* 返回数据
* 只有key-value类型的数据才可以进行join,
* 所以这里返回元组,以基站id为key,
* 以基站的经纬度为value:
* (基站id, (经度, 纬度))
*/
(id, (x, y))
}) //join
//返回:RDD[(基站id, ((手机号码, 时间), (经度, 纬度)))]
val joined: RDD[(String, ((String, Long), (String, String)))] = lacAndMobieTime.join(splitedLacInfo)
//ArrayBuffer((CC0710CC94ECC657A8561DE549D940E0,((18688888888,1300),(116.303955,40.041935))), (CC0710CC94ECC657A8561DE549D940E0,((18611132889,1900),(116.303955,40.041935))), (16030401EAFB68F1E3CDF819735E1C66,((18688888888,87600),(116.296302,40.032296))), (16030401EAFB68F1E3CDF819735E1C66,((18611132889,97500),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18611132889,54000),(116.304864,40.050645))), (9F36407EAD0629FC166F14DDE7970F68,((18688888888,51200),(116.304864,40.050645))))
//System.out.println(joined.collect().toBuffer) //按手机号码分组
val groupedByMobile = joined.groupBy(_._2._1._1)
//ArrayBuffer((18688888888,CompactBuffer((CC0710CC94ECC657A8561DE549D940E0,((18688888888,1300),(116.303955,40.041935))), (16030401EAFB68F1E3CDF819735E1C66,((18688888888,87600),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18688888888,51200),(116.304864,40.050645))))), (18611132889,CompactBuffer((CC0710CC94ECC657A8561DE549D940E0,((18611132889,1900),(116.303955,40.041935))), (16030401EAFB68F1E3CDF819735E1C66,((18611132889,97500),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18611132889,54000),(116.304864,40.050645))))))
//System.out.println(groupedByMobile.collect().toBuffer) /**
* 先转成List,再按照时间排序,再反转元素,再取Top2
*/
val result = groupedByMobile.mapValues(_.toList.sortBy(_._2._1._2).reverse.take(2))
//ArrayBuffer((18688888888,List((16030401EAFB68F1E3CDF819735E1C66,((18688888888,87600),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18688888888,51200),(116.304864,40.050645))))), (18611132889,List((16030401EAFB68F1E3CDF819735E1C66,((18611132889,97500),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18611132889,54000),(116.304864,40.050645))))))
//System.out.println(result.collect().toBuffer) //将结果写入到文件系统
result.saveAsTextFile(args(2)) //释放资源
sc.stop()
}
}

5. 本地测试

编辑配置信息:
使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

点击“+”号,添加一个配置:
使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

选择“Application”:

使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

选择Main方法所在的类:
使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

填写配置的名称,在Program arguments输入框中填写3个参数,分别为两个输入目录和一个输出目录:

使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

在本地运行程序:

使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

抛出一个内存不足异常:

17/01/24 17:17:58 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: System memory 259522560 must be at least 4.718592E8. Please use a larger heap size.
at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:198)
at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:180)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:354)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:457)
at com.xuebusi.spark.MobileLocation$.main(MobileLocation.scala:37)
at com.xuebusi.spark.MobileLocation.main(MobileLocation.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

可以程序中添加一行代码来解决:
conf.set("spark.testing.memory", "536870912")//后面的值大于512m即可

但是上面的方式不够灵活,这里我们采用给JVM传参的方式。修改配置,在VM options输入框中添加参数“-Xmx512m”或者“-Dspark.testing.memory=536870912”,内存大小为512M:

使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

程序输出日志以及结果:
使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

6.提交到Spark集群上运行

本地运行和提交到集群上运行,在代码上有所区别,需要修改一行代码:

使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

将编写好的程序使用Maven插件打成jar包:
使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

将jar包上传到Spark集群服务器:

使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

将测试所用的数据也上传到HDFS集群:

使用Scala编写Spark程序求基站下移动用户停留时长TopN-LMLPHP

运行命令:

/root/apps/spark/bin/spark-submit \
--master spark://hadoop01:7077,hadoop02:7077 \
--executor-memory 512m \
--total-executor-cores 7 \
--class com.xuebusi.spark.MobileLocation \
/root/spark-1.0-SNAPSHOT.jar \
hdfs://hadoop01:9000/mobile/input/mobile_logs \
hdfs://hadoop01:9000/mobile/input/loc_logs \
hdfs://hadoop01:9000/mobile/output

7. 程序运行时的输出日志

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
// :: INFO SparkContext: Running Spark version 1.6.
// :: WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
// :: INFO SecurityManager: Changing view acls to: root
// :: INFO SecurityManager: Changing modify acls to: root
// :: INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
// :: INFO Utils: Successfully started service 'sparkDriver' on port .
// :: INFO Slf4jLogger: Slf4jLogger started
// :: INFO Remoting: Starting remoting
// :: INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:49399]
// :: INFO Utils: Successfully started service 'sparkDriverActorSystem' on port .
// :: INFO SparkEnv: Registering MapOutputTracker
// :: INFO SparkEnv: Registering BlockManagerMaster
// :: INFO DiskBlockManager: Created local directory at /tmp/blockmgr-6c4988a1-3ad2-49ad-8cc2-02390384792b
// :: INFO MemoryStore: MemoryStore started with capacity 517.4 MB
// :: INFO SparkEnv: Registering OutputCommitCoordinator
// :: INFO Utils: Successfully started service 'SparkUI' on port .
// :: INFO SparkUI: Started SparkUI at http://192.168.71.11:4040
// :: INFO HttpFileServer: HTTP File server directory is /tmp/spark-eaa76419-9ddc-43c1-ad0e-cc95c0dede7e/httpd--8d9c-4ea7-9fca-5caf8c712f86
// :: INFO HttpServer: Starting HTTP Server
// :: INFO Utils: Successfully started service 'HTTP file server' on port .
// :: INFO SparkContext: Added JAR file:/root/spark-1.0-SNAPSHOT.jar at http://192.168.71.11:58135/jars/spark-1.0-SNAPSHOT.jar with timestamp 1485286086076
// :: INFO Executor: Starting executor ID driver on host localhost
// :: INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port .
// :: INFO NettyBlockTransferService: Server created on
// :: INFO BlockManagerMaster: Trying to register BlockManager
// :: INFO BlockManagerMasterEndpoint: Registering block manager localhost: with 517.4 MB RAM, BlockManagerId(driver, localhost, )
// :: INFO BlockManagerMaster: Registered BlockManager
// :: INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 153.6 KB, free 153.6 KB)
// :: INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 167.5 KB)
// :: INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost: (size: 13.9 KB, free: 517.4 MB)
// :: INFO SparkContext: Created broadcast from textFile at MobileLocation.scala:
// :: INFO FileInputFormat: Total input paths to process :
// :: INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 86.4 KB, free 253.9 KB)
// :: INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 19.3 KB, free 273.2 KB)
// :: INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost: (size: 19.3 KB, free: 517.4 MB)
// :: INFO SparkContext: Created broadcast from textFile at MobileLocation.scala:
// :: INFO FileInputFormat: Total input paths to process :
// :: INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
// :: INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
// :: INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
// :: INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
// :: INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
// :: INFO SparkContext: Starting job: saveAsTextFile at MobileLocation.scala:
// :: INFO DAGScheduler: Registering RDD (map at MobileLocation.scala:)
// :: INFO DAGScheduler: Registering RDD (map at MobileLocation.scala:)
// :: INFO DAGScheduler: Registering RDD (map at MobileLocation.scala:)
// :: INFO DAGScheduler: Registering RDD (groupBy at MobileLocation.scala:)
// :: INFO DAGScheduler: Got job (saveAsTextFile at MobileLocation.scala:) with output partitions
// :: INFO DAGScheduler: Final stage: ResultStage (saveAsTextFile at MobileLocation.scala:)
// :: INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage )
// :: INFO DAGScheduler: Missing parents: List(ShuffleMapStage )
// :: INFO DAGScheduler: Submitting ShuffleMapStage (MapPartitionsRDD[] at map at MobileLocation.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.8 KB, free 277.0 KB)
// :: INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.2 KB, free 279.2 KB)
// :: INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost: (size: 2.2 KB, free: 517.4 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ShuffleMapStage (MapPartitionsRDD[] at map at MobileLocation.scala:)
// :: INFO TaskSchedulerImpl: Adding task set 1.0 with tasks
// :: INFO DAGScheduler: Submitting ShuffleMapStage (MapPartitionsRDD[] at map at MobileLocation.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.9 KB, free 283.1 KB)
// :: INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.2 KB, free 285.3 KB)
// :: INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID , localhost, partition ,ANY, bytes)
// :: INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost: (size: 2.2 KB, free: 517.4 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ShuffleMapStage (MapPartitionsRDD[] at map at MobileLocation.scala:)
// :: INFO TaskSchedulerImpl: Adding task set 0.0 with tasks
// :: INFO Executor: Running task 0.0 in stage 1.0 (TID )
// :: INFO Executor: Fetching http://192.168.71.11:58135/jars/spark-1.0-SNAPSHOT.jar with timestamp 1485286086076
// :: INFO Utils: Fetching http://192.168.71.11:58135/jars/spark-1.0-SNAPSHOT.jar to /tmp/spark-eaa76419-9ddc-43c1-ad0e-cc95c0dede7e/userFiles-1790c4be-0a0d-45fd-91f9-c66c49e765a5/fetchFileTemp1508611941727652704.tmp
// :: INFO Executor: Adding file:/tmp/spark-eaa76419-9ddc-43c1-ad0e-cc95c0dede7e/userFiles-1790c4be-0a0d-45fd-91f9-c66c49e765a5/spark-1.0-SNAPSHOT.jar to class loader
// :: INFO HadoopRDD: Input split: hdfs://hadoop01:9000/mobile/input/loc_logs/loc_info.txt:0+171
// :: INFO Executor: Finished task 0.0 in stage 1.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID , localhost, partition ,ANY, bytes)
// :: INFO Executor: Running task 0.0 in stage 0.0 (TID )
// :: INFO HadoopRDD: Input split: hdfs://hadoop01:9000/mobile/input/mobile_logs/19735E1C66.log:0+248
// :: INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID ) in ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
// :: INFO DAGScheduler: ShuffleMapStage (map at MobileLocation.scala:) finished in 5.716 s
// :: INFO DAGScheduler: looking for newly runnable stages
// :: INFO DAGScheduler: running: Set(ShuffleMapStage )
// :: INFO DAGScheduler: waiting: Set(ShuffleMapStage , ShuffleMapStage , ResultStage )
// :: INFO DAGScheduler: failed: Set()
// :: INFO Executor: Finished task 0.0 in stage 0.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID , localhost, partition ,ANY, bytes)
// :: INFO Executor: Running task 1.0 in stage 0.0 (TID )
// :: INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID ) in ms on localhost (/)
// :: INFO HadoopRDD: Input split: hdfs://hadoop01:9000/mobile/input/mobile_logs/DDE7970F68.log:0+496
// :: INFO Executor: Finished task 1.0 in stage 0.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID , localhost, partition ,ANY, bytes)
// :: INFO Executor: Running task 2.0 in stage 0.0 (TID )
// :: INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID ) in ms on localhost (/)
// :: INFO HadoopRDD: Input split: hdfs://hadoop01:9000/mobile/input/mobile_logs/E549D940E0.log:0+496
// :: INFO Executor: Finished task 2.0 in stage 0.0 (TID ). bytes result sent to driver
// :: INFO DAGScheduler: ShuffleMapStage (map at MobileLocation.scala:) finished in 6.045 s
// :: INFO DAGScheduler: looking for newly runnable stages
// :: INFO DAGScheduler: running: Set()
// :: INFO DAGScheduler: waiting: Set(ShuffleMapStage , ShuffleMapStage , ResultStage )
// :: INFO DAGScheduler: failed: Set()
// :: INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID ) in ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
// :: INFO DAGScheduler: Submitting ShuffleMapStage (MapPartitionsRDD[] at map at MobileLocation.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.9 KB, free 288.3 KB)
// :: INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 1800.0 B, free 290.0 KB)
// :: INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost: (size: 1800.0 B, free: 517.4 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ShuffleMapStage (MapPartitionsRDD[] at map at MobileLocation.scala:)
// :: INFO TaskSchedulerImpl: Adding task set 2.0 with tasks
// :: INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID , localhost, partition ,NODE_LOCAL, bytes)
// :: INFO Executor: Running task 0.0 in stage 2.0 (TID )
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO Executor: Finished task 0.0 in stage 2.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID , localhost, partition ,NODE_LOCAL, bytes)
// :: INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID ) in ms on localhost (/)
// :: INFO Executor: Running task 1.0 in stage 2.0 (TID )
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO Executor: Finished task 1.0 in stage 2.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID , localhost, partition ,NODE_LOCAL, bytes)
// :: INFO Executor: Running task 2.0 in stage 2.0 (TID )
// :: INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID ) in ms on localhost (/)
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO Executor: Finished task 2.0 in stage 2.0 (TID ). bytes result sent to driver
// :: INFO DAGScheduler: ShuffleMapStage (map at MobileLocation.scala:) finished in 0.673 s
// :: INFO DAGScheduler: looking for newly runnable stages
// :: INFO DAGScheduler: running: Set()
// :: INFO DAGScheduler: waiting: Set(ShuffleMapStage , ResultStage )
// :: INFO DAGScheduler: failed: Set()
// :: INFO TaskSetManager: Finished task 2.0 in stage 2.0 (TID ) in ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
// :: INFO DAGScheduler: Submitting ShuffleMapStage (MapPartitionsRDD[] at groupBy at MobileLocation.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 4.1 KB, free 294.2 KB)
// :: INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.1 KB, free 296.2 KB)
// :: INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost: (size: 2.1 KB, free: 517.4 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ShuffleMapStage (MapPartitionsRDD[] at groupBy at MobileLocation.scala:)
// :: INFO TaskSchedulerImpl: Adding task set 3.0 with tasks
// :: INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID , localhost, partition ,PROCESS_LOCAL, bytes)
// :: INFO Executor: Running task 0.0 in stage 3.0 (TID )
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO Executor: Finished task 0.0 in stage 3.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID , localhost, partition ,PROCESS_LOCAL, bytes)
// :: INFO Executor: Running task 1.0 in stage 3.0 (TID )
// :: INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID ) in ms on localhost (/)
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO Executor: Finished task 1.0 in stage 3.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Starting task 2.0 in stage 3.0 (TID , localhost, partition ,PROCESS_LOCAL, bytes)
// :: INFO Executor: Running task 2.0 in stage 3.0 (TID )
// :: INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID ) in ms on localhost (/)
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO Executor: Finished task 2.0 in stage 3.0 (TID ). bytes result sent to driver
// :: INFO DAGScheduler: ShuffleMapStage (groupBy at MobileLocation.scala:) finished in 0.606 s
// :: INFO DAGScheduler: looking for newly runnable stages
// :: INFO DAGScheduler: running: Set()
// :: INFO DAGScheduler: waiting: Set(ResultStage )
// :: INFO DAGScheduler: failed: Set()
// :: INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID ) in ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
// :: INFO DAGScheduler: Submitting ResultStage (MapPartitionsRDD[] at saveAsTextFile at MobileLocation.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 66.4 KB, free 362.6 KB)
// :: INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 23.0 KB, free 385.6 KB)
// :: INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost: (size: 23.0 KB, free: 517.3 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ResultStage (MapPartitionsRDD[] at saveAsTextFile at MobileLocation.scala:)
// :: INFO TaskSchedulerImpl: Adding task set 4.0 with tasks
// :: INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID , localhost, partition ,NODE_LOCAL, bytes)
// :: INFO Executor: Running task 0.0 in stage 4.0 (TID )
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO FileOutputCommitter: Saved output of task 'attempt_201701241128_0004_m_000000_10' to hdfs://hadoop01:9000/mobile/output/_temporary/0/task_201701241128_0004_m_000000
// :: INFO SparkHadoopMapRedUtil: attempt_201701241128_0004_m_000000_10: Committed
// :: INFO Executor: Finished task 0.0 in stage 4.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID , localhost, partition ,NODE_LOCAL, bytes)
// :: INFO Executor: Running task 1.0 in stage 4.0 (TID )
// :: INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID ) in ms on localhost (/)
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost: in memory (size: 2.2 KB, free: 517.3 MB)
// :: INFO BlockManagerInfo: Removed broadcast_5_piece0 on localhost: in memory (size: 2.1 KB, free: 517.3 MB)
// :: INFO BlockManagerInfo: Removed broadcast_4_piece0 on localhost: in memory (size: 1800.0 B, free: 517.3 MB)
// :: INFO BlockManagerInfo: Removed broadcast_3_piece0 on localhost: in memory (size: 2.2 KB, free: 517.4 MB)
// :: INFO FileOutputCommitter: Saved output of task 'attempt_201701241128_0004_m_000001_11' to hdfs://hadoop01:9000/mobile/output/_temporary/0/task_201701241128_0004_m_000001
// :: INFO SparkHadoopMapRedUtil: attempt_201701241128_0004_m_000001_11: Committed
// :: INFO Executor: Finished task 1.0 in stage 4.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Starting task 2.0 in stage 4.0 (TID , localhost, partition ,NODE_LOCAL, bytes)
// :: INFO Executor: Running task 2.0 in stage 4.0 (TID )
// :: INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID ) in ms on localhost (/)
// :: INFO ShuffleBlockFetcherIterator: Getting non-empty blocks out of blocks
// :: INFO ShuffleBlockFetcherIterator: Started remote fetches in ms
// :: INFO FileOutputCommitter: Saved output of task 'attempt_201701241128_0004_m_000002_12' to hdfs://hadoop01:9000/mobile/output/_temporary/0/task_201701241128_0004_m_000002
// :: INFO SparkHadoopMapRedUtil: attempt_201701241128_0004_m_000002_12: Committed
// :: INFO Executor: Finished task 2.0 in stage 4.0 (TID ). bytes result sent to driver
// :: INFO DAGScheduler: ResultStage (saveAsTextFile at MobileLocation.scala:) finished in 3.750 s
// :: INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID ) in ms on localhost (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
// :: INFO DAGScheduler: Job finished: saveAsTextFile at MobileLocation.scala:, took 14.029200 s
// :: INFO SparkUI: Stopped Spark web UI at http://192.168.71.11:4040
// :: INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
// :: INFO MemoryStore: MemoryStore cleared
// :: INFO BlockManager: BlockManager stopped
// :: INFO BlockManagerMaster: BlockManagerMaster stopped
// :: INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
// :: INFO SparkContext: Successfully stopped SparkContext
// :: INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
// :: INFO ShutdownHookManager: Shutdown hook called
// :: INFO ShutdownHookManager: Deleting directory /tmp/spark-eaa76419-9ddc-43c1-ad0e-cc95c0dede7e
// :: INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
// :: INFO ShutdownHookManager: Deleting directory /tmp/spark-eaa76419-9ddc-43c1-ad0e-cc95c0dede7e/httpd--8d9c-4ea7-9fca-5caf8c712f86
[root@hadoop01 ~]#
[root@hadoop01 ~]#
[root@hadoop01 ~]#

8. 查看输出结果

[root@hadoop01 ~]# hdfs dfs -cat /mobile/output/part-00000
[root@hadoop01 ~]# hdfs dfs -cat /mobile/output/part-00001
(18688888888,List((16030401EAFB68F1E3CDF819735E1C66,((18688888888,87600),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18688888888,51200),(116.304864,40.050645)))))
(18611132889,List((16030401EAFB68F1E3CDF819735E1C66,((18611132889,97500),(116.296302,40.032296))), (9F36407EAD0629FC166F14DDE7970F68,((18611132889,54000),(116.304864,40.050645)))))
[root@hadoop01 ~]# hdfs dfs -cat /mobile/output/part-00002
[root@hadoop01 ~]#
05-11 05:04