点击(此处)折叠或打开
- package com.hdp.design;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.MapWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import java.io.IOException;
- import java.util.*;
- public class MyStripesJob {
- /*
- 输入数据为用户购买的商品ID列表,具体如下
- 2,3,1,4,5,2,3
- 1,2,5,2
- 4,5
- 1,3,4,1
- 3,1
- 4,2,1,5,5,3
- map input:2,3,1,4,5,2,3
- 1,[2,2]
- 1,[3,2]
- 1,[4,1]
- 1,[5,1]
- 2,[3,2]
- 2,[4,1]
- 2,[5,1]
- 2,[3,2]
- 2,[4,1]
- 2,[5,1]
- 3,[4,1]
- 3,[5,1]
- 3,[4,1]
- 3,[5,1]
- 4,[5,1]
- map input:1,2,5,2
- 1,[2,2]
- 1,[5,1]
- 2,[5,1]
- 2,[5,1]
- map input:4,5
- 4,[5,1]
- map input:1,3,4,1
- 1,[3,1]
- 1,[4,1]
- 1,[3,1]
- 1,[4,1]
- 3,[4,1]
- map input:3,1
- 1,[3,1]
- map input:4,2,1,5,5,3
- 1,[2,1]
- 1,[3,1]
- 1,[4,1]
- 1,[5,2]
- 2,[3,1]
- 2,[4,1]
- 2,[5,2]
- 3,[4,1]
- 3,[5,2]
- 4,[5,2]
- */
- public static class MyStripesMapper extends Mapper<Object, Text, Text, MapWritable>{
- @Override
- protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- String []arrays = value.toString().split(",");
- System.out.println("map input:"+ value.toString());
- //对输入数据排序,避免将同时购买商品[1,2]和[2,1]分别计数
- List<String> list = Arrays.asList(arrays);
- Collections.sort(list);
- for (int i=0;i<list.size();i++){
- MapWritable mapWritable = new MapWritable();
- String curPrdId = list.get(i);
- for (int j=i+1;j<list.size();j++){
- String nextPrdId = list.get(j);
- Text prdIdKey = new Text(nextPrdId);
- if(curPrdId.equals(nextPrdId)){//过滤相同的商品ID
- continue;
- }
- //对购买商品ID次数累计
- if(mapWritable.get(prdIdKey) ==null){
- mapWritable.put(prdIdKey, new IntWritable(1));
- }else {
- IntWritable cnt = (IntWritable) mapWritable.get(prdIdKey);
- cnt.set(cnt.get()+1);
- mapWritable.put(prdIdKey,cnt);
- }
- }
- if(mapWritable.size()>0){
- System.out.println("map out:");
- for (MapWritable.Entry tmpMap:mapWritable.entrySet()){
- System.out.println(curPrdId+",["+tmpMap.getKey().toString() +","+ tmpMap.getValue()+"]");
- }
- context.write(new Text(curPrdId), mapWritable);
- }
- }
- }
- }
- /*
- reduce 输出
- [1,2] 5
- [1,3] 6
- [1,4] 4
- [1,5] 4
- [2,3] 5
- [2,4] 3
- [2,5] 6
- [3,4] 4
- [3,5] 4
- [4,5] 4
- *
- * */
- public static class MyStripesReduce extends Reducer<Text, MapWritable, Text,Text>{
- @Override
- protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {
- System.out.println("reduce input:"+key+","+ values.toString());
- HashMap<String, Integer> map = new HashMap<String, Integer>();
- for (MapWritable value: values){
- for (Writable mapKey:value.keySet()){
- String strKey = mapKey.toString();
- //对收到的数据在次进行累加
- int num = Integer.parseInt(value.get(mapKey).toString());
- if(map.get(strKey)==null){
- map.put(strKey,num);
- }else {
- map.put(strKey, (Integer)map.get(strKey)+num);
- }
- }
- }
- for (Map.Entry<String, Integer> tmpMap : map.entrySet()){
- String outKey = "["+key.toString()+","+tmpMap.getKey()+"]";
- System.out.println("reduce out:"+ outKey+ ","+ String.valueOf(tmpMap.getValue()));
- context.write(new Text(outKey), new Text( String.valueOf(tmpMap.getValue())));
- }
- }
- }
- public static void main(String []args){
- try {
- Job job = Job.getInstance();
- job.setJobName("MyStripesJob");
- job.setJarByClass(MyStripesJob.class);
- job.setMapperClass(MyStripesMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(MapWritable.class);
- job.setReducerClass(MyStripesReduce.class);
- job.setOutputValueClass(Text.class);
- job.setOutputKeyClass(Text.class);
- job.setNumReduceTasks(1);
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- FileSystem.get(job.getConfiguration()).delete(new Path(args[1]), true);
- System.out.println(job.waitForCompletion(true));
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
- }