根据PV统计出前三的热门板块,并统计出热门板块下的用户数--方式一

根据PV统计出前三的热门板块,并统计出热门板块下的用户数--方式一-LMLPHP

根据PV统计出前三的热门板块,并统计出热门板块下的用户数--方式一-LMLPHP
测试数据
根据PV统计出前三的热门板块,并统计出热门板块下的用户数--方式一-LMLPHP
java代码
 package com.hzf.spark.study;

 import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; public class HotChannel01 {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("HotChannel")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("userLog1");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
hotChannel(sc, logRDD, broadcast);
}
private static void hotChannel(JavaSparkContext sc,JavaRDD<String> logRDD, final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter(new Function<String, Boolean>() { private static final long serialVersionUID = 1L; @Override
public Boolean call(String v1) throws Exception {
String actionParam = broadcast.value();
String action = v1.split("\t")[5];
return actionParam.equals(action);
}
}); JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair(new PairFunction<String, String,String>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<String, String> call(String val) throws Exception {
String channel = val.split("\t")[4]; return new Tuple2<String, String>(channel,null);
}
});
Map<String, Object> channelPVMap = channel2nullRDD.countByKey();
Set<String> keySet = channelPVMap.keySet();
List<SortObj> channels = new ArrayList<>();
for(String channel : keySet){
channels.add(new SortObj(channel, Integer.valueOf(channelPVMap.get(channel)+"")));
}
Collections.sort(channels, new Comparator<SortObj>() { @Override
public int compare(SortObj o1, SortObj o2) {
return o2.getValue() - o1.getValue();
}
}); List<String> hotChannelList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
hotChannelList.add(channels.get(i).getKey());
}
for(String channle : hotChannelList){
System.out.println("channle:" + channle);
} final Broadcast<List<String>> hotChannelListBroadcast = sc.broadcast(hotChannelList); JavaRDD<String> filtedRDD = logRDD.filter(new Function<String, Boolean>() { @Override
public Boolean call(String v1) throws Exception {
List<String> hostChannels = hotChannelListBroadcast.value();
String channel = v1.split("\t")[4];
String userId = v1.split("\t")[2];
return hostChannels.contains(channel) && !"null".equals(userId);
}
}); JavaPairRDD<String, String> channel2UserRDD = filtedRDD.mapToPair(new PairFunction<String, String, String>() { @Override
public Tuple2<String, String> call(String v1) throws Exception {
String[] splited = v1.split("\t");
String channel = splited[4];
String userId = splited[2];
return new Tuple2<String, String>(channel,userId);
}
}); channel2UserRDD.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<String>>>() { private static final long serialVersionUID = 1L; @Override
public void call(Tuple2<String, Iterable<String>> tuple) throws Exception {
String channel = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
Map<String, Integer> userNumMap = new HashMap<>();
while(iterator.hasNext()){
String userId = iterator.next();
Integer count = userNumMap.get(userId);
if(count == null){
count = 1;
}else{
count ++;
}
userNumMap.put(userId, count);
} List<SortObj> lists = new ArrayList<>();
Set<String> keys = userNumMap.keySet();
for(String key : keys){
lists.add(new SortObj(key, userNumMap.get(key)));
} Collections.sort(lists,new Comparator<SortObj>() { @Override
public int compare(SortObj O1, SortObj O2) {
return O2.getValue() - O1.getValue();
}
}); System.out.println("HOT_CHANNEL:"+channel);
for(int i = 0 ; i < 3 ; i++){
SortObj sortObj = lists.get(i);
System.out.println(sortObj.getKey()+"=="+sortObj.getValue());
}
}
});
}
}
result
根据PV统计出前三的热门板块,并统计出热门板块下的用户数--方式一-LMLPHP

 
05-11 13:58