1、 在hadoop中所有的key/value都必须实现Writable接口,有两个方法,分别用于读(反序列化)和写(序列化)操作。

参考代码:

 package org.dragon.hadoop.mapreduce.app;

 import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.Writable; /**
*
* @author ZhuXY
* @time 2016-3-10 下午3:49:55
*
*/
public class DataWritable implements Writable { // telsphone // upload
private int upPackNum;
private int upPayLoad; // download
private int downPackNum;
private int downPayLoad; public DataWritable() { } public void set(int upPackNum, int upPayLoad, int downPackNum,
int downPayload) {
this.upPackNum = upPackNum;
this.upPayLoad = upPayLoad;
this.downPackNum = downPackNum;
this.downPayLoad = downPayload; } public int getUpPackNum() {
return upPackNum;
} public int getUpPayLoas() {
return upPayLoad;
} public int getDownPackNum() {
return downPackNum;
} public int getDownPayload() {
return downPayLoad;
} @Override
public void write(DataOutput out) throws IOException {
out.writeInt(upPackNum);
out.writeInt(upPayLoad);
out.writeInt(downPackNum);
out.writeInt(downPayLoad);
} /**
* 讀出的順序要和寫入的順序相同
*/
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.upPackNum = in.readInt();
this.upPayLoad = in.readInt();
this.downPackNum = in.readInt();
this.downPayLoad = in.readInt();
} @Override
public String toString() {
return upPackNum + "\t" + upPayLoad + "\t" + downPackNum + "\t"
+ downPayLoad;
} @Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + downPackNum;
result = prime * result + downPayLoad;
result = prime * result + upPackNum;
result = prime * result + upPayLoad;
return result;
} @Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
DataWritable other = (DataWritable) obj;
if (downPackNum != other.downPackNum)
return false;
if (downPayLoad != other.downPayLoad)
return false;
if (upPackNum != other.upPackNum)
return false;
if (upPayLoad != other.upPayLoad)
return false;
return true;
} }

简单继承Writable例子 Code

2、所有的key必须实现Comparable接口,在MapReduce过程中需要对Key/Value对进行反复的排序。默认情况下依据Key进行排序的,要实现comparaTo()方法。所以通过Key既要实现Writable接口又要实现Comparable接口,Hadoop中提供了一个公共的接口,叫做WritableComparable接口:

3、由于需要序列化反序列化和进行比较,对java对象需要重写一下几个方法:

①    equals();

②    hashCode();

③    toString()方法

如IntWritable类型的实现:

 package org.apache.hadoop.io;

 import java.io.*;

 /** A WritableComparable for ints. */
public class IntWritable implements WritableComparable {
private int value; public IntWritable() {} public IntWritable(int value) { set(value); } /** Set the value of this IntWritable. */
public void set(int value) { this.value = value; } /** Return the value of this IntWritable. */
public int get() { return value; } public void readFields(DataInput in) throws IOException {
value = in.readInt();
} public void write(DataOutput out) throws IOException {
out.writeInt(value);
} /** Returns true iff <code>o</code> is a IntWritable with the same value. */
public boolean equals(Object o) {
if (!(o instanceof IntWritable))
return false;
IntWritable other = (IntWritable)o;
return this.value == other.value;
} public int hashCode() {
return value;
} /** Compares two IntWritables. */
public int compareTo(Object o) {
int thisValue = this.value;
int thatValue = ((IntWritable)o).value;
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
} public String toString() {
return Integer.toString(value);
}

4、数据类型,必须有一个无参的构造方法,为了方便反射创建对象。

在自定义数据类型中,建议使用java原生数据类型,最好不要使用hadoop对原生类型封装好的数据类型,即如下样例代码:

推荐使用:

不建议使用:

5、问题:

  当数据写入磁盘时,如果要进行排序的话,需要首先从磁盘中读取数据进行反序列化成对象,然后在内存中对反序列化的对象进行比较。

  对字节(未经过反序列化字节)进行直接比较,不需要进行反序列化以后再比较呢?如果要实现上述功能,Hadoop数据类型需要实现一个接口RawComparator。

  在Hadoop中有一个针对Writable数据类型,进行实现的一个通用实现类WritableComparator类。所有的数据类型,只需要继承通用类,再去需要具体功能复写相应的compara()方法。一下以IntWritable为例,查看一下:

对于自定义的Comparator类需要以下几步:

1) 推荐Comparator类定义在数据类型内部,静态内部类,实现WritableComparator类。

  2) 重写默认无参构造方法,方法内必须调用父类有参构造方法,如下截图:

3) 重载父类的compare()方法,依据具体功能覆写。

4) 向WritableComparator类中注册自定义的Comparator类,代码如下:

5、自定义数据类型

  样例代码:

 package org.dragon.hadoop.mr.io;

 import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils; /**
* 自定义数据类型对Writable的实现。
* 快捷键get、set选择器alt+shift+s
* @author ZhuXY
* @time 2016-3-9 下午10:40:02
*
*/
/**
* 1、Hadoop之——数据类型
1) 在hadoop中所有的key/value都必须实现Writable接口,有两个方法,分别用于读(反序列化)和写(序列化)操作。
2) 所有的key必须实现Comparable接口,在MapReduce过程中需要对Key/Value对进行反复的排序。默认情况下依据Key进行排序的,要实现comparaTo()方法。所以通过Key既要实现Writable接口又要实现Comparable接口,Hadoop中提供了一个公共的接口,叫做WritableComparable接口
3) 由于需要序列化反序列化和进行比较,对java对象需要重写一下几个方法:
① equals();
② hashCode();
③ toString()方法
4) 数据类型,必须有一个无参的构造方法,为了方便反射创建对象。
5) 在自定义数据类型中,建议使用java原生数据类型,最好不要使用hadoop对原生类型封装好的数据类型,即 */ /**
* 问题:
当数据写入磁盘时,如果要进行排序的话,需要首先从磁盘中读取数据进行反序列化成对象,然后在内存中对反序列化的对象进行比较。
* 对字节(未经过反序列化字节)进行直接比较,不需要进行反序列化以后再比较呢?如果要实现上述功能,Hadoop数据类型需要实现一个接口RawComparator。
在Hadoop中有一个针对Writable数据类型,进行实现的一个通用实现类WritableComparator类。所有的数据类型,只需要继承通用类,再去需要具体功能复写相应的compara()方法。
对于自定义的Comparator类需要以下几步:
1) 推荐Comparator类定义在数据类型内部,静态内部类,实现WritableComparator类。
2) 重写默认无参构造方法,方法内必须调用父类有参构造方法,如下截图: 3) 重载父类的compare()方法,依据具体功能覆写。
4) 向WritableComparator类中注册自定义的Comparator类,代码如下: */ /**
* WritableCOmparator是RawComparator对WritableComparable类的一个通用实现。它提供两个主要的功能。
* 首先他提供了一个默认的对原始compare()函数的调用,对从数据流对要比较的对象进行反序列化,然后调用对象
* 的compare方法。
* 其次,他充当的是RawComparator实例的工厂方法(Writable方法已经注册)。
* @author ZhuXY
*
*/
public class PairWritable implements WritableComparable<PairWritable> { private String name;// Text
private Integer age;// IntWritale public PairWritable() {
} public PairWritable(String name, Integer age) {
this.set(name, age);
} public void set(String name, Integer age) {
this.name = name;
this.age = age;
} public String getName() {
return name;
} public Integer getAge() {
return age;
} /**
* write方法是在写入数据时调用,进行序列化
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
} /**
* readField()方法是在取出数据时调用的方法,反序列化方法
* 以便生成对象
*/
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF();
this.age = in.readInt();
} /**
*
hashCode 的常规协定是:
1)在 Java 应用程序执行期间,在同一对象上多次调用 hashCode 方法时,必须一致地返回相同的整数,前提是对象上 equals 比较中所用的信息没有被修改。从某一应用程序的一次执行到同一应用程序的另一次执行,该整数无需保持一致。
2)如果根据 equals(Object) 方法,两个对象是相等的,那么在两个对象中的每个对象上调用 hashCode 方法都必须生成相同的整数结果。
3)以下情况不 是必需的:如果根据 equals(java.lang.Object) 方法,两个对象不相等,那么在两个对象中的任一对象上调用 hashCode 方法必定会生成不同的整数结果。但是,程序员应该知道,为不相等的对象生成不同整数结果可以提高哈希表的性能。
4)实际上,由 Object 类定义的 hashCode 方法确实会针对不同的对象返回不同的整数。(这一般是通过将该对象的内部地址转换成一个整数来实现的,但是 JavaTM 编程语言不需要这种实现技巧。) 5)当equals方法被重写时,通常有必要重写 hashCode 方法,以维护 hashCode 方法的常规协定,该协定声明相等对象必须具有相等的哈希码。
*/
@Override
public int hashCode() {
return name.hashCode() * 31 + age.hashCode();
} @Override
public boolean equals(Object obj) {
if (obj instanceof PairWritable) {
PairWritable pairWritable = (PairWritable) obj; return this.name.equals(pairWritable.getName())
&& this.age.equals(pairWritable.getAge());
}
return false;
} @Override
public String toString() {
// TODO Auto-generated method stub
return this.name+"\t"+this.age;
} @Override
public int compareTo(PairWritable o) {
int cmp=this.name.compareTo(o.getName());
if (cmp!=0) {
return cmp;
}
return this.age.compareTo(o.getAge());
} public static class Comparator extends WritableComparator{ public Comparator(){
super(PairWritable.class);
} /**
* 第一个字节数组
* byte[] b1, int s1, int l1,
* 字节数组起始位置长度
*
* 第二个字节数组
* byte[] b2, int s2, int l2
* 字节数组的起始位置长度
*/
/**
*
* 核心:
* 这个接口允许执行者比较从流中读取的未被反序列化为对象的记录,从而省去了创建对象的所有开销。
* 例如,IntWritables的comparator使用原始的compare()方法从每个字节数组的指定
* 开始位置(S1和S2)和长度(L1和L2)读取整数(b1和b2),然后直接进行比较。
*/
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int n1=WritableUtils.decodeVIntSize(b1[s1]);
int n2=WritableUtils.decodeVIntSize(b2[s2]); int cmp=WritableComparator.compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2+n2); if (0!=cmp) {
return cmp;
} int thisValue=readInt(b1, l1-s1-n1);
int thatValue=readInt(b2, l2-s2-n2); return (thisValue<thatValue ?-1:(thisValue==thatValue?0:1));
}
static {
WritableComparator.define(PairWritable.class, new Comparator());
}
}
}

PairWritable Code包括Writable和RawComparator

  通常情况下,实现一个静态方法read(DataInput),用于构造数据类型的实例对象,方法内部调用readFields(DataInput)方法

Hadoop MapReduce Data Type中所有的Key,必须实现WritableComparable接口,官方文档说明如下:

  比较器RawComparator,官方文档说明如下:

6、注意NullWritable类型

 package org.apache.hadoop.io;

 import java.io.*;

 /** Singleton Writable with no data. */
public class NullWritable implements WritableComparable { private static final NullWritable THIS = new NullWritable(); private NullWritable() {} // no public ctor /** Returns the single instance of this class. */
public static NullWritable get() { return THIS; } public String toString() {
return "(null)";
} public int hashCode() { return 0; }
public int compareTo(Object other) {
if (!(other instanceof NullWritable)) {
throw new ClassCastException("can't compare " + other.getClass().getName()
+ " to NullWritable");
}
return 0;
}
public boolean equals(Object other) { return other instanceof NullWritable; }
public void readFields(DataInput in) throws IOException {}
public void write(DataOutput out) throws IOException {} /** A Comparator &quot;optimized&quot; for NullWritable. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(NullWritable.class);
} /**
* Compare the buffers in serialized form.
*/
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
assert 0 == l1;
assert 0 == l2;
return 0;
}
} static { // register this comparator
WritableComparator.define(NullWritable.class, new Comparator());
}
}
05-27 09:43