我正在尝试为我为python插件mongo-hadoop编写的hadoop流作业设置dumbo驱动程序扩展。

dumbo项目需要我使用TypedBytesWritable类。所以我像这样创建了一个新的InputFormat&RecordReader:

package com.mongodb.hadoop;

public class TypedBytesTableInputFormat implements InputFormat<TypedBytesWritable, TypedBytesWritable> {

@Override
public RecordReader<TypedBytesWritable, TypedBytesWritable> getRecordReader(InputSplit split,
                                                                            JobConf job,
                                                                            Reporter reporter) {

    if (!(split instanceof MongoInputSplit))
        throw new IllegalStateException("Creation of a new RecordReader requires a MongoInputSplit instance.");

    final MongoInputSplit mis = (MongoInputSplit) split;

    //**THE FOLLOWING LINE THROWS THE ERROR**
    return (RecordReader<TypedBytesWritable, TypedBytesWritable>) new TypedBytesMongoRecordReader(mis);
}

这是扩展的RecordReader:
package com.mongodb.hadoop.input;
...
...
import org.apache.hadoop.mapreduce.RecordReader;
...
...

public class TypedBytesMongoRecordReader extends RecordReader<TypedBytesWritable, TypedBytesWritable> {

public TypedBytesMongoRecordReader(MongoInputSplit mis) {
    _cursor = mis.getCursor();
}

@Override
public void close() {
    if ( _cursor != null )
        _cursor.close();
}

但是当我运行该作业时,它将引发此错误。我不确定为什么,它是RecordReader的子级。我究竟做错了什么?这是RecordReader类的API文档。我以为自己做的一切正确:

http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapreduce/RecordReader.html

我确实在正在强制转换为RecordReader的行上收到警告,但没有错误,并且它可以很好地构建jar。警告:
Type safety: Unchecked cast from TypedBytesMongoRecordReader to RecordReader<TypedBytesWritable,TypedBytesWritable>

最佳答案

试试这个:

public <T extends RecordReader<TypedBytesWritable, TypedBytesWritable>> T getRecordReader(InputSplit split, JobConf job, Reporter reporter) {

    if (!(split instanceof MongoInputSplit))
        throw new IllegalStateException("Creation of a new RecordReader requires a MongoInputSplit instance.");

    final MongoInputSplit mis = (MongoInputSplit) split;

    return new TypedBytesMongoRecordReader(mis); // you may need a cast (T) - try it without first
}

10-06 11:58