有人可以带我了解DDL生成的类的读写数据的基本工作流程吗?

我已经使用DDL定义了一些类似于结构的记录。例如:

  class Customer {
     ustring FirstName;
     ustring LastName;
     ustring CardNo;
     long LastPurchase;
  }

我已经对此进行了编译,以获取Customer类并将其包含在我的项目中。我可以很容易地看到如何将其用作映射器和化简器的输入和输出(生成的类实现Writable),而不是如何读取和写入文件。

org.apache.hadoop.record包的JavaDoc讨论了以Binary,CSV或XML格式序列化这些记录。我实际上该怎么做?假设我的reducer产生IntWritable键和Customer值。我使用什么OutputFormat以CSV格式写入结果?如果以后要对生成的文件进行分析,我将使用哪种InputFormat读取生成的文件?

最佳答案

好的,所以我想我已经解决了。我不确定这是否是最直接的方法,因此,如果您知道更简单的工作流程,请更正我。

从DDL生成的每个类都实现Record接口(interface),因此提供了两种方法:

serialize(RecordOutput out)用于写入
反序列化(RecordInput in)以进行读取

RecordOutput和RecordInput是org.apache.hadoop.record包中提供的实用程序接口(interface)。有一些实现(例如XMLRecordOutput,BinaryRecordOutput,CSVRecordOutput)

据我所知,您必须实现自己的OutputFormat或InputFormat类才能使用它们。这很容易做到。

例如,我在原始问题中谈到的OutputFormat(以CSV格式写Integer键和Customer值的那个)将像这样实现:


  private static class CustomerOutputFormat
    extends TextOutputFormat<IntWritable, Customer>
  {

    public RecordWriter<IntWritable, Customer> getRecordWriter(FileSystem ignored,
      JobConf job,
      String name,
      Progressable progress)
    throws IOException {
      Path file = FileOutputFormat.getTaskOutputPath(job, name);
      FileSystem fs = file.getFileSystem(job);
      FSDataOutputStream fileOut = fs.create(file, progress);
      return new CustomerRecordWriter(fileOut);
    }

    protected static class CustomerRecordWriter
      implements RecordWriter<IntWritable, Customer>
    {

      protected DataOutputStream outStream ;

      public AnchorRecordWriter(DataOutputStream out) {
        this.outStream = out ;
      }

      public synchronized void write(IntWritable key, Customer value) throws IOException {

        CsvRecordOutput csvOutput = new CsvRecordOutput(outStream);
        csvOutput.writeInteger(key.get(), "id") ;
        value.serialize(csvOutput) ;
      }

      public synchronized void close(Reporter reporter) throws IOException {
        outStream.close();
      }
    }
  }

创建InputFormat几乎相同。由于csv格式是每行一项,因此我们可以在内部使用LineRecordReader来完成大部分工作。


private static class CustomerInputFormat extends FileInputFormat<IntWritable, Customer> {

  public RecordReader<IntWritable, Customer> getRecordReader(
    InputSplit genericSplit,
    JobConf job,
    Reporter reporter)
  throws IOException {

    reporter.setStatus(genericSplit.toString());
    return new CustomerRecordReader(job, (FileSplit) genericSplit);
  }

  private class CustomerRecordReader implements RecordReader<IntWritable, Customer> {

    private LineRecordReader lrr ;

    public CustomerRecordReader(Configuration job, FileSplit split)
    throws IOException{
      this.lrr = new LineRecordReader(job, split);
    }

    public IntWritable createKey() {
      return new IntWritable();
    }

    public Customer createValue() {
      return new Customer();
    }

    public synchronized boolean next(IntWritable key, Customer value)
    throws IOException {

      LongWritable offset = new LongWritable() ;
      Text line = new Text() ;

      if (!lrr.next(offset, line))
        return false ;

      CsvRecordInput cri = new CsvRecordInput(new
        ByteArrayInputStream(line.toString().getBytes())) ;
      key.set(cri.readInt("id")) ;
      value.deserialize(cri) ;

      return true ;
    }

    public float getProgress() {
      return lrr.getProgress() ;
    }

    public synchronized long getPos() throws IOException {
      return lrr.getPos() ;
    }

    public synchronized void close() throws IOException {
      lrr.close();
    }
  }
}

关于hadoop - Hadoop/MapReduce:读写从DDL生成的类,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/2845627/

10-13 07:18