我正在使用:

  • hadoop-client 2.2.0
  • mrunit 1.0.0
  • avro 1.7.6
  • avro-mrunit 1.7.6

  • ...整个过程正在使用Maven构建和测试。

    在遵循MRUnit with Avro NullPointerException in Serialization的说明之前,我一直在获取NullPointerException。

    现在我得到一个InstantiationException:
    Running mypackage.MyTest
    log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    2014-03-23 20:49:21.463 java[27994:1003] Unable to load realm info from SCDynamicStore
    Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.945 sec <<< FAILURE!
    process(mypackage.MyTest)  Time elapsed: 0.909 sec  <<< ERROR!
    java.lang.RuntimeException: java.lang.InstantiationException
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
        at org.apache.hadoop.io.serializer.SerializationFactory.add(SerializationFactory.java:72)
        at org.apache.hadoop.io.serializer.SerializationFactory.<init>(SerializationFactory.java:63)
        at org.apache.hadoop.mrunit.internal.io.Serialization.<init>(Serialization.java:37)
        at org.apache.hadoop.mrunit.TestDriver.getSerialization(TestDriver.java:464)
        at org.apache.hadoop.mrunit.TestDriver.copy(TestDriver.java:608)
        at org.apache.hadoop.mrunit.TestDriver.copyPair(TestDriver.java:612)
        at org.apache.hadoop.mrunit.MapDriverBase.addInput(MapDriverBase.java:118)
        at org.apache.hadoop.mrunit.MapDriverBase.withInput(MapDriverBase.java:207)
        at mypackage.MyTest.process(MyTest.java:92)
    ...
    

    Avro模型如下所示:
    {
        "namespace": "model",
        "type": "record",
        "name": "Blob",
        "fields": [
            { "name": "value", "type": "string" }
        ]
    }
    

    映射器如下所示:
    public class MyMapper
        extends Mapper<AvroKey<Blob>, NullWritable, LongWritable, NullWritable>
    {
        @Override
        public void map(AvroKey<Blob> key, NullWritable value, Context context)
                throws IOException, InterruptedException {
            context.write(new LongWritable(0), NullWritable.get());
        }
    }
    

    失败的测试(我目前唯一的测试)如下所示:
    @Test
    public void process() throws IOException {
        mapper = new MyMapper();
        job = Job.getInstance();
        mapDriver = MapDriver.newMapDriver(mapper);
    
        Configuration configuration = mapDriver.getConfiguration();
        //Copy over the default io.serializations. If you don't do this then you will
        //not be able to deserialize the inputs to the mapper
        String[] serializations = configuration.getStrings("io.serializations");
        serializations = Arrays.copyOf(serializations, serializations.length + 1);
        serializations[serializations.length-1] = AvroSerialization.class.getName();
        configuration.setStrings("io.serializations", serializations);
    
        //Configure AvroSerialization by specifying the key writer and value writer schemas
        configuration.setStrings("avro.serialization.key.writer.schema", Schema.create(Schema.Type.LONG).toString(true));
        configuration.setStrings("avro.serialization.value.writer.schema", Schema.create(Schema.Type.NULL).toString(true));
    
        job.setMapperClass(MyMapper.class);
        job.setInputFormatClass(AvroKeyInputFormat.class);
        AvroJob.setInputKeySchema(job, Blob.SCHEMA$);
        job.setOutputKeyClass(LongWritable.class);
    
        input = Blob.newBuilder()
            .setValue("abc")
            .build();
        mapDriver
            .withInput(new AvroKey<Blob>(input), NullWritable.get())
            .withOutput(new LongWritable(0), NullWritable.get())
            .runTest();
    }
    

    我对Avro和MRUnit还是很陌生,所以我仍在尝试完全了解它们之间的工作方式。在单元测试输出中,我看到有关log4j的警告,并且不确定是否这不是问题的一部分(我对此表示怀疑)。

    最佳答案

    试试这个;虽然这是从ReflectionUtil发出的错误,但是外部框架是关于序列化的,并且您未实现可写的。
    因此,我认为可能与avro序列化设置不正确有关。

            MapDriver driver = MapDriver.newMapDriver(your mapper);
    
            Configuration conf = driver.getConfiguration();
            AvroSerialization.addToConfiguration(conf);
            AvroSerialization.setKeyWriterSchema(conf, your schema);
            AvroSerialization.setKeyReaderSchema(conf, your schema);
            Job job = new Job(conf);
            job.set... your job settings;
            AvroJob.set... your avro job settings;
    

    10-01 03:18