自定义InputFormat

大数据(hadoop-自定义数据类型、文件格式)-LMLPHP

OutputFormat

大数据(hadoop-自定义数据类型、文件格式)-LMLPHP

示例代码

package com.vip09;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class ScoreWritable implements WritableComparable<Object>{

	//在自定义的数据类型中,建议使用java原生的数据类型
	private float chinese ;
	private float math ;
	private float english ;
	private float physics ;
	private float chemistry ;

	//在自定义的数据类型中,必须要有一个无参的构造方法
	public ScoreWritable(){}

	public ScoreWritable(float chinese, float math, float english, float physics, float chemistry) {
		this.chinese = chinese;
		this.math = math;
		this.english = english;
		this.physics = physics;
		this.chemistry = chemistry;
	}

	public void set(float chinese, float math, float english, float physics, float chemistry){
		this.chinese = chinese;
		this.math = math;
		this.english = english;
		this.physics = physics;
		this.chemistry = chemistry;
	}

	public float getChinese() {
		return chinese;
	}

	public float getMath() {
		return math;
	}

	public float getEnglish() {
		return english;
	}

	public float getPhysics() {
		return physics;
	}

	public float getChemistry() {
		return chemistry;
	}

	//是在写入数据的时候调用,进行序列化
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeFloat(chinese);
		out.writeFloat(math);
		out.writeFloat(english);
		out.writeFloat(physics);
		out.writeFloat(chemistry);
	}

	//该方法是在取出数据时调用,反序列化,以便生成对象
	@Override
	public void readFields(DataInput in) throws IOException {
		chinese = in.readFloat() ;
		math = in.readFloat() ;
		english = in.readFloat() ;
		physics = in.readFloat() ;
		chemistry = in.readFloat() ;
	}

	@Override
	public int compareTo(Object o) {
		// TODO Auto-generated method stub
		return 0;
	}

}
package com.vip09;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ScoreCount extends Configured implements Tool{

	//map和reduce
	public static class ScoreMapper extends Mapper<Text, ScoreWritable, Text, ScoreWritable>{
		@Override
		protected void map(Text key, ScoreWritable value,
				Context context)
				throws IOException, InterruptedException {
			context.write(key, value);
		}
	}

	public static class ScoreReducer extends Reducer<Text, ScoreWritable, Text, Text>{
		private Text text = new Text() ;
		@Override
		protected void reduce(Text key, Iterable<ScoreWritable> value,
				Context context) throws IOException, InterruptedException {
			float totalScore = 0.0f ;
			float avgScore = 0.0f ;
			for (ScoreWritable sw : value) {
				totalScore = sw.getChinese() + sw.getEnglish() + sw.getMath() + sw.getPhysics() + sw.getChemistry() ;
				avgScore = totalScore/5 ;
			}
			text.set(totalScore + "\t" + avgScore);
			context.write(key, text);
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration() ;
		//删除已经存在的输出目录
		Path mypath = new Path(args[1]) ;
		FileSystem hdfs = mypath.getFileSystem(conf);
		if(hdfs.isDirectory(mypath)){
			hdfs.delete(mypath, true) ;
		}

		Job job = Job.getInstance(conf, "scorecount") ;
		job.setJarByClass(ScoreCount.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.setMapperClass(ScoreMapper.class);
		job.setReducerClass(ScoreReducer.class);

		//如果是自定义的类型,需要进行设置
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(ScoreWritable.class);

		//设置自定义的输入格式
		job.setInputFormatClass(ScoreInputFormat.class);

		job.waitForCompletion(true) ;
		return 0;
	}

	public static void main(String[] args) throws Exception {
		String[] args0 = {"hdfs://192.168.153.111:9000/input5",
		"hdfs://192.168.153.111:9000/output15"} ;
		int res = ToolRunner.run(new ScoreCount(), args0) ;
		System.exit(res);
	}

}
package com.vip09;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ScoreInputFormat extends FileInputFormat<Text, ScoreWritable> {

	//需要注意的是:
	/*
	 * 对于一个数据输入格式,都需要一个对应的RecordReader
	 * 重写createRecordReader()方法,其实也就是重写其返回的对象
	 * 这里就是自定义的ScoreRecordReader类,该类需要继承RecordReader,实现数据的读取
	 * */
	@Override
	public RecordReader<Text, ScoreWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return new ScoreRecordReader();
	}

}
package com.vip09;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

public class ScoreRecordReader extends RecordReader<Text, ScoreWritable>{

	public LineReader in ;				//行读取器
	public Text lineKey ;				//自定义key类型
	public ScoreWritable linevalue ;	//自定义的value类型
	public Text line ;					//行数据

	//初始化方法,只执行一次
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		FileSplit fsplit = (FileSplit)split ;
		Configuration conf = context.getConfiguration();
		Path file = fsplit.getPath();
		FileSystem fs = file.getFileSystem(conf);
		FSDataInputStream filein = fs.open(file);
		in = new LineReader(filein, conf) ;
		line = new Text() ;
		lineKey = new Text() ;
		linevalue = new ScoreWritable() ;
	}

	//读取每一行数据的时候,都会执行该方法
	//我们只需要根据自己的需求,重点编写该方法即可,其他的方法比较固定,仿照就好
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		int linesize = in.readLine(line);
		if(linesize == 0){
			return false ;
		}
		String[] pieces = line.toString().split("\\s+") ;
		if(pieces.length != 7){
			throw new IOException("无效的数据") ;
		}
		//将学生的每门成绩转换为float类型
		float a =0 , b= 0 , c = 0 ,d = 0, e =0 ;
		try{
			a = Float.parseFloat(pieces[2].trim()) ;
			b = Float.parseFloat(pieces[3].trim()) ;
			c = Float.parseFloat(pieces[4].trim()) ;
			d = Float.parseFloat(pieces[5].trim()) ;
			e = Float.parseFloat(pieces[6].trim()) ;
		}catch(NumberFormatException nfe){
			nfe.printStackTrace();
		}
		lineKey.set(pieces[0] + "\t" + pieces[1]);	//完成自定义的key数据
		linevalue.set(a, b, c, d, e);				//封装自定义的value数据
		return true;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return lineKey;
	}

	@Override
	public ScoreWritable getCurrentValue() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return linevalue;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return 0;
	}

	@Override
	public void close() throws IOException {
		if(in != null){
			in.close();
		}
	}

}
package com.vip09;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapReduceCaseEmail extends Configured implements Tool{

	public static class EmailMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
		private final static IntWritable one = new IntWritable(1) ;
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			context.write(value, one);
		}
	}

	public static class EmailReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
		private IntWritable result = new IntWritable() ;
		//输出到多个文件或多个文件夹,使用Multipleoutputs
		private MultipleOutputs<Text, IntWritable> mout ;

		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			mout = new MultipleOutputs<Text, IntWritable>(context) ;
		}

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int begin = key.toString().indexOf("@") ;
			int end = key.toString().indexOf(".") ;
			if(begin >= end){
				return ;
			}

			//获取邮箱类别,比如qq,163等
			String name = key.toString().substring(begin + 1, end);
			int sum = 0 ;
			for (IntWritable value : values) {
				sum += value.get() ;
			}
			result.set(sum);
			//baseoutputpath-r-nnnnn
			mout.write(key, result, name);
		}

		@Override
		protected void cleanup(Context context)
				throws IOException, InterruptedException {
			mout.close();
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration() ;
		//删除已经存在的输出目录
		Path mypath = new Path(args[1]) ;
		FileSystem hdfs = mypath.getFileSystem(conf);
		if(hdfs.isDirectory(mypath)){
			hdfs.delete(mypath, true) ;
		}

		Job job = Job.getInstance(conf, "emailcount") ;
		job.setJarByClass(MapReduceCaseEmail.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.setMapperClass(EmailMapper.class);
		job.setReducerClass(EmailReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		job.waitForCompletion(true) ;
		return 0;
	}

	public static void main(String[] args) throws Exception {
		String[] args0 = {"hdfs://192.168.153.111:9000/input6",
		"hdfs://192.168.153.111:9000/output16"} ;
		int res = ToolRunner.run(new MapReduceCaseEmail(), args0) ;
		System.exit(res);
	}

}








06-12 00:44