首先,考虑以下CustomWriter类:

public final class CustomWriter {

  private final SequenceFile.Writer writer;

  CustomWriter(Configuration configuration, Path outputPath) throws IOException {
    FileSystem fileSystem = FileSystem.get(configuration);
    if (fileSystem.exists(outputPath)) {
      fileSystem.delete(outputPath, true);
    }

    writer = SequenceFile.createWriter(configuration,
        SequenceFile.Writer.file(outputPath),
        SequenceFile.Writer.keyClass(LongWritable.class),
        SequenceFile.Writer.valueClass(ItemWritable.class),
        SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DefaultCodec()),
        SequenceFile.Writer.blockSize(1024 * 1024),
        SequenceFile.Writer.bufferSize(fileSystem.getConf().getInt("io.file.buffer.size", 4 * 1024)),
        SequenceFile.Writer.replication(fileSystem.getDefaultReplication(outputPath)),
        SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
  }

  public void close() throws IOException {
    writer.close();
  }

  public void write(Item item) throws IOException {
    writer.append(new LongWritable(item.getId()), new ItemWritable(item));
  }
}

我想做的是消耗Item类型对象的异步流。使用者具有对CustomWriter实例的引用。然后,它为收到的每个项目调用CustomWriter#write方法。当流结束时,调用CustomWriter#close方法以关闭编写器。

如您所见,我只创建了一个编写器,它开始追加到一个全新的文件中。因此,毫无疑问this不是原因。

我还应该注意,我目前正在按照here的说明使用MiniDFSCluster在单元测试环境中运行此程序。如果我在非单元测试环境中运行此程序(即不使用MiniDFSCluster),它似乎可以正常运行。

当我尝试将文件全部读回时,我看到的是最后写入的Item对象N次(其中N是流中接收到的项目总数)。这是一个例子:
sparkContext.hadoopFile(path, SequenceFileInputFormat.class, LongWritable.class, ItemWritable.class)
    .collect()
    .forEach(new BiConsumer<>() {
      @Override
      public void accept(Tuple2<LongWritable, ItemWritable> tuple) {
        LongWritable id = tuple._1();
        ItemWritable item = tuple._2();
        System.out.print(id.get() + " -> " + item.get());
      }
    });

这将打印如下内容:
...
1234 -> Item[...]
1234 -> Item[...]
1234 -> Item[...]
...

我是在做错什么,还是使用MiniDFSCluster的副作用?

最佳答案

Writable(例如LongWritable, ItemWritable)在处理数据期间被重用。接收记录时,Writable通常仅替换其内容,并且您将只接收相同的Writable对象。如果要将它们收集到数组中,则应将它们复制到新对象。

10-08 02:28