glob获取原始文件名

glob获取原始文件名

本文介绍了Apache Beam TextIO glob获取原始文件名的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经建立了一条管道.我必须解析数百个* .gz文件.因此,glob的效果很好.

I have setup a pipeline. I have to parse hundreds of *.gz files. Therefore glob works quite good.

但是我需要当前处理文件的原始名称,因为我想将结果文件命名为原始文件.

But I need the original name of the currently processed file, because i want to name the result files as the original files.

有人可以在这里帮助我吗?

Can anyone help me here?

这是我的代码.

@Default.String(LOGS_PATH + "*.gz")
String getInputFile();
void setInputFile(String value);


    TextIO.Read read = TextIO.read().withCompressionType(TextIO.CompressionType.GZIP).from(options.getInputFile());
        read.getName();

        p.apply("ReadLines", read).apply(new CountWords())
         .apply(MapElements.via(new FormatAsTextFn()))
         .apply("WriteCounts", TextIO.write().to(WordCountOptions.LOGS_PATH + "_" + options.getOutput()));

    p.run().waitUntilFinish();

推荐答案

从Beam 2.2开始,可以结合使用FileIO.match()FileIO.read()和自定义代码来读取文本行.您可以在HEAD上使用它,也可以等到2.2版完成(当前正在进行中).

This is possible starting with Beam 2.2 using a combination of FileIO.match(), FileIO.read() and custom code to read lines of text. You can already use this at HEAD, or you can wait until release 2.2 is finalized (it's currently in progress).

PCollection<KV<String, String>> filesAndLines =
  p.apply(FileIO.match().filepattern(...))
   .apply(FileIO.read())
   .apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
     @ProcessElement
     public void process(ProcessContext c) {
       ReadableFile f = c.element();
       String filename = f.getMetadata().resourceId().toString();
       String line;
       try (BufferedReader r = new BufferedReader(Channels.newInputStream(f.open()))) {
         while ((line = r.readLine()) != null) {
           c.output(KV.of(filename, line));
         }
       }
     }
   }));

这篇关于Apache Beam TextIO glob获取原始文件名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-21 01:39