在我的MapReduce作业中,我使用AvroParquetOutputFormat使用Avro模式写入Parquet文件。
应用程序逻辑要求由Reducer创建多种类型的文件,并且每个文件都有其自己的Avro模式。
AvroParquetOutputFormat类具有一个静态方法setSchema()来设置输出的Avro模式。查看代码,AvroParquetOutputFormat使用AvroWriteSupport.setSchema(),它也是一个静态实现。
在不扩展AvroWriteSupport和修改逻辑的情况下,是否有更简单的方法在单个MR作业中从AvroParquetOutputFormat实现多个Avro模式输出?
任何指针/输入高度赞赏。
感谢和问候
MK
最佳答案
回答可能为时已晚,但是我也遇到了这个问题并提出了解决方案。
首先,没有像MultipleAvroParquetOutputFormat
内置的'parquet-mr
'这样的支持。但是要实现类似的行为,我使用了MultipleOutputs
。
对于仅 map 的工作,将您的 map 绘制者如下:
public class EventMapper extends Mapper<LongWritable, BytesWritable, Void, GenericRecord>{
protected KafkaAvroDecoder deserializer;
protected String outputPath = "";
// Using MultipleOutputs to write custom named files
protected MultipleOutputs<Void, GenericRecord> mos;
public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
outputPath = conf.get(FileOutputFormat.OUTDIR);
mos = new MultipleOutputs<Void, GenericRecord>(context);
}
public void map(LongWritable ln, BytesWritable value, Context context){
try {
GenericRecord record = (GenericRecord) deserializer.fromBytes(value.getBytes());
AvroWriteSupport.setSchema(context.getConfiguration(), record.getSchema());
Schema schema = record.getSchema();
String mergeEventsPath = outputPath + "/" + schema.getName(); // Adding '/' will do no harm
mos.write( (Void) null, record, mergeEventsPath);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
这将为每个架构创建一个新的
RecordWriter
,并创建一个新的 Parquet 文件,并在其后附加该架构名称,例如schema1-r-0000.parquet。这还将基于驱动程序中设置的架构创建默认的part-r-0000x.parquet文件。为了避免这种情况,请使用
LazyOutputFormat
像这样:LazyOutputFormat.setOutputFormatClass(job, AvroParquetOutputFormat.class);
希望这可以帮助。