package com.oncedq.code; import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapred.lib.ChainMapper; import com.oncedq.code.util.DateUtil; public class ProcessSample {
public static class ExtractMappper extends MapReduceBase implements
Mapper<LongWritable, Text, LongWritable, Conn1> { @Override
public void map(LongWritable arg0, Text arg1,
OutputCollector<LongWritable, Conn1> arg2, Reporter arg3)
throws IOException {
String line = arg1.toString();
String[] strs = line.split(";");
Conn1 conn1 = new Conn1();
conn1.orderKey = Long.parseLong(strs[0]);
conn1.customer = Long.parseLong(strs[1]);
conn1.state = strs[2];
conn1.price = Double.parseDouble(strs[3]);
conn1.orderDate = DateUtil.getDateFromString(strs[4], "yyyy-MM-dd");
LongWritable lw = new LongWritable(conn1.orderKey);
arg2.collect(lw, conn1);
} } private static class Conn1 implements WritableComparable<Conn1> {
public long orderKey;
public long customer;
public String state;
public double price;
public java.util.Date orderDate; @Override
public void readFields(DataInput in) throws IOException {
orderKey = in.readLong();
customer = in.readLong();
state = Text.readString(in);
price = in.readDouble();
orderDate = DateUtil.getDateFromString(Text.readString(in),
"yyyy-MM-dd");
} @Override
public void write(DataOutput out) throws IOException {
out.writeLong(orderKey);
out.writeLong(customer);
Text.writeString(out, state);
out.writeDouble(price);
Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
} @Override
public int compareTo(Conn1 arg0) {
// TODO Auto-generated method stub
return 0;
} } public static class Filter1Mapper extends MapReduceBase implements
Mapper<LongWritable, Conn1, LongWritable, Conn2> { @Override
public void map(LongWritable inKey, Conn1 c2,
OutputCollector<LongWritable, Conn2> collector, Reporter report)
throws IOException {
if (c2.state.equals("F")) {
Conn2 inValue = new Conn2();
inValue.customer = c2.customer;
inValue.orderDate = c2.orderDate;
inValue.orderKey = c2.orderKey;
inValue.price = c2.price;
inValue.state = c2.state;
collector.collect(inKey, inValue);
}
} } private static class Conn2 implements WritableComparable<Conn1> {
public long orderKey;
public long customer;
public String state;
public double price;
public java.util.Date orderDate; @Override
public void readFields(DataInput in) throws IOException {
orderKey = in.readLong();
customer = in.readLong();
state = Text.readString(in);
price = in.readDouble();
orderDate = DateUtil.getDateFromString(Text.readString(in),
"yyyy-MM-dd");
} @Override
public void write(DataOutput out) throws IOException {
out.writeLong(orderKey);
out.writeLong(customer);
Text.writeString(out, state);
out.writeDouble(price);
Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
} @Override
public int compareTo(Conn1 arg0) {
// TODO Auto-generated method stub
return 0;
} } public static class RegexMapper extends MapReduceBase implements
Mapper<LongWritable, Conn2, LongWritable, Conn3> { @Override
public void map(LongWritable inKey, Conn2 c3,
OutputCollector<LongWritable, Conn3> collector, Reporter report)
throws IOException {
c3.state = c3.state.replaceAll("F", "Find");
Conn3 c2 = new Conn3();
c2.customer = c3.customer;
c2.orderDate = c3.orderDate;
c2.orderKey = c3.orderKey;
c2.price = c3.price;
c2.state = c3.state;
collector.collect(inKey, c2);
}
} private static class Conn3 implements WritableComparable<Conn1> {
public long orderKey;
public long customer;
public String state;
public double price;
public java.util.Date orderDate; @Override
public void readFields(DataInput in) throws IOException {
orderKey = in.readLong();
customer = in.readLong();
state = Text.readString(in);
price = in.readDouble();
orderDate = DateUtil.getDateFromString(Text.readString(in),
"yyyy-MM-dd");
} @Override
public void write(DataOutput out) throws IOException {
out.writeLong(orderKey);
out.writeLong(customer);
Text.writeString(out, state);
out.writeDouble(price);
Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
} @Override
public int compareTo(Conn1 arg0) {
// TODO Auto-generated method stub
return 0;
} } public static class LoadMapper extends MapReduceBase implements
Mapper<LongWritable, Conn3, LongWritable, Conn3> { @Override
public void map(LongWritable arg0, Conn3 arg1,
OutputCollector<LongWritable, Conn3> arg2, Reporter arg3)
throws IOException {
arg2.collect(arg0, arg1);
} } public static void main(String[] args) {
JobConf job = new JobConf(ProcessSample.class);
job.setJobName("ProcessSample");
job.setNumReduceTasks(0);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
JobConf mapper1 = new JobConf();
JobConf mapper2 = new JobConf();
JobConf mapper3 = new JobConf();
JobConf mapper4 = new JobConf();
ChainMapper cm = new ChainMapper();
cm.addMapper(job, ExtractMappper.class, LongWritable.class, Text.class,
LongWritable.class, Conn1.class, true, mapper1);
cm.addMapper(job, Filter1Mapper.class, LongWritable.class, Conn1.class,
LongWritable.class, Conn2.class, true, mapper2);
cm.addMapper(job, RegexMapper.class, LongWritable.class, Conn2.class,
LongWritable.class, Conn3.class, true, mapper3);
cm.addMapper(job, LoadMapper.class, LongWritable.class, Conn3.class,
LongWritable.class, Conn3.class, true, mapper4);
FileInputFormat.setInputPaths(job, new Path("orderData"));
FileOutputFormat.setOutputPath(job, new Path("orderDataOutput"));
Job job1;
try {
job1 = new Job(job);
JobControl jc = new JobControl("test");
jc.addJob(job1);
jc.run();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} }
}