本文介绍了Apache Beam TextIO glob获取原始文件名的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我已经建立了一条管道.我必须解析数百个* .gz文件.因此,glob的效果很好.
I have setup a pipeline. I have to parse hundreds of *.gz files. Therefore glob works quite good.
但是我需要当前处理文件的原始名称,因为我想将结果文件命名为原始文件.
But I need the original name of the currently processed file, because i want to name the result files as the original files.
有人可以在这里帮助我吗?
Can anyone help me here?
这是我的代码.
@Default.String(LOGS_PATH + "*.gz")
String getInputFile();
void setInputFile(String value);
TextIO.Read read = TextIO.read().withCompressionType(TextIO.CompressionType.GZIP).from(options.getInputFile());
read.getName();
p.apply("ReadLines", read).apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(WordCountOptions.LOGS_PATH + "_" + options.getOutput()));
p.run().waitUntilFinish();
推荐答案
从Beam 2.2开始,可以结合使用FileIO.match()
,FileIO.read()
和自定义代码来读取文本行.您可以在HEAD上使用它,也可以等到2.2版完成(当前正在进行中).
This is possible starting with Beam 2.2 using a combination of FileIO.match()
, FileIO.read()
and custom code to read lines of text. You can already use this at HEAD, or you can wait until release 2.2 is finalized (it's currently in progress).
PCollection<KV<String, String>> filesAndLines =
p.apply(FileIO.match().filepattern(...))
.apply(FileIO.read())
.apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void process(ProcessContext c) {
ReadableFile f = c.element();
String filename = f.getMetadata().resourceId().toString();
String line;
try (BufferedReader r = new BufferedReader(Channels.newInputStream(f.open()))) {
while ((line = r.readLine()) != null) {
c.output(KV.of(filename, line));
}
}
}
}));
这篇关于Apache Beam TextIO glob获取原始文件名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!