1、业务需求

在拥有手机号在每个基站处停留时间日志 和 基站信息的 算出某个手机号的(所在基站,停留时间),(当前所在经度,当前所在纬度)

其中手机连接基站产生的日志信息类似如下:

18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1
18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1
18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0
18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0

上面的含义表示的是:手机号,时间,基站ID,接入网络的类型(0:unknow,1:3G,2:2G,6:4G)

基站信息:

9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6

上面的含义表示的是:基站ID,经度,纬度,接入网络的类型(0:unknow,1:3G,2:2G,6:4G)

编写Scale代码:

	package com.Hive

	import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext} object FD { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FD").setMaster("local[2]")
val sc = new SparkContext(conf) //1.读取数据文件 val user =sc.textFile("src/main/data/log/")//用户数据
val base = sc.textFile("src/main/data/base_info.txt")//基站数据 //2.数据清洗工作,数据维度提取
// 用户数据清洗
val splited = user.map(line =>{ val fields = line.split(",")
val phone = fields(0) val base = fields(2)
val envet = fields(3).toInt val time = {
if (envet == 1){
-fields(1).toLong//赋值-
}else{
fields(1).toLong//正值+
}
} ((phone,base),time)
}) // splited.collect().foreach(println(_)) // 基站数据清洗
val alcsplited = base.map(line =>{
val fields = line.split(",")
val id = fields(0)
val x = fields(1)
val y = fields(2)
(id,(x,y))
}) // splited.collect().foreach(println(_)) //3.统计每个用户在每个基站中停留的时间 val reducted = splited.reduceByKey(_+_) // reducted.collect().foreach(println(_)) //((phone,base),time)
val pmt = reducted.map(x=>{ //(基站ID,(手机号,时间))
//x._1对应的是元组((mobile,lac),time)中的(mobile,lac)
//x._2对应的是元组((mobile,lac),time)中的time
((x._1._2),(x._1._1,x._2)) }) //连接join 之后的结果[(基站ID,((手机号,时间),(经度,纬度)))] val joined:RDD[(String, ((String, Long), (String, String)))] = pmt.join(alcsplited) //按照手机号进行分组
//_. :代表的是基站 手机号,时间,经度,纬度
//_._2 :代表的是 手机号,时间 经度,纬度
//_._2_1 :代表的是 手机号,时间
//_._2._1._ :代表的是 手机号
val MobileGroupBykey = joined.groupBy(_._2._1._1) val result = MobileGroupBykey.mapValues(_.toList.sortBy(_._2._1._2).reverse.take(2)) println(result.collect().toBuffer) sc.stop() } }
05-11 16:04