有人可以带我了解DDL生成的类的读写数据的基本工作流程吗?
我已经使用DDL定义了一些类似于结构的记录。例如:
class Customer {
ustring FirstName;
ustring LastName;
ustring CardNo;
long LastPurchase;
}
我已经对此进行了编译,以获取Customer类并将其包含在我的项目中。我可以很容易地看到如何将其用作映射器和化简器的输入和输出(生成的类实现Writable),而不是如何读取和写入文件。
org.apache.hadoop.record包的JavaDoc讨论了以Binary,CSV或XML格式序列化这些记录。我实际上该怎么做?假设我的reducer产生IntWritable键和Customer值。我使用什么OutputFormat以CSV格式写入结果?如果以后要对生成的文件进行分析,我将使用哪种InputFormat读取生成的文件?
最佳答案
好的,所以我想我已经解决了。我不确定这是否是最直接的方法,因此,如果您知道更简单的工作流程,请更正我。
从DDL生成的每个类都实现Record接口(interface),因此提供了两种方法:
serialize(RecordOutput out)用于写入
反序列化(RecordInput in)以进行读取
RecordOutput和RecordInput是org.apache.hadoop.record包中提供的实用程序接口(interface)。有一些实现(例如XMLRecordOutput,BinaryRecordOutput,CSVRecordOutput)
据我所知,您必须实现自己的OutputFormat或InputFormat类才能使用它们。这很容易做到。
例如,我在原始问题中谈到的OutputFormat(以CSV格式写Integer键和Customer值的那个)将像这样实现:
private static class CustomerOutputFormat
extends TextOutputFormat<IntWritable, Customer>
{
public RecordWriter<IntWritable, Customer> getRecordWriter(FileSystem ignored,
JobConf job,
String name,
Progressable progress)
throws IOException {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new CustomerRecordWriter(fileOut);
}
protected static class CustomerRecordWriter
implements RecordWriter<IntWritable, Customer>
{
protected DataOutputStream outStream ;
public AnchorRecordWriter(DataOutputStream out) {
this.outStream = out ;
}
public synchronized void write(IntWritable key, Customer value) throws IOException {
CsvRecordOutput csvOutput = new CsvRecordOutput(outStream);
csvOutput.writeInteger(key.get(), "id") ;
value.serialize(csvOutput) ;
}
public synchronized void close(Reporter reporter) throws IOException {
outStream.close();
}
}
}
创建InputFormat几乎相同。由于csv格式是每行一项,因此我们可以在内部使用LineRecordReader来完成大部分工作。
private static class CustomerInputFormat extends FileInputFormat<IntWritable, Customer> {
public RecordReader<IntWritable, Customer> getRecordReader(
InputSplit genericSplit,
JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new CustomerRecordReader(job, (FileSplit) genericSplit);
}
private class CustomerRecordReader implements RecordReader<IntWritable, Customer> {
private LineRecordReader lrr ;
public CustomerRecordReader(Configuration job, FileSplit split)
throws IOException{
this.lrr = new LineRecordReader(job, split);
}
public IntWritable createKey() {
return new IntWritable();
}
public Customer createValue() {
return new Customer();
}
public synchronized boolean next(IntWritable key, Customer value)
throws IOException {
LongWritable offset = new LongWritable() ;
Text line = new Text() ;
if (!lrr.next(offset, line))
return false ;
CsvRecordInput cri = new CsvRecordInput(new
ByteArrayInputStream(line.toString().getBytes())) ;
key.set(cri.readInt("id")) ;
value.deserialize(cri) ;
return true ;
}
public float getProgress() {
return lrr.getProgress() ;
}
public synchronized long getPos() throws IOException {
return lrr.getPos() ;
}
public synchronized void close() throws IOException {
lrr.close();
}
}
}
关于hadoop - Hadoop/MapReduce:读写从DDL生成的类,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/2845627/