我很难理解TextIO.write()的.withFileNamePolicy的概念。提供FileNamePolicy的要求看起来非常复杂,就像执行指定GCS存储桶以写入流式文件一样简单。
在较高级别上,我将JSON消息流式传输到PubSub主题,并且我希望将这些原始消息写入GCS中的文件中以进行永久存储(我还将对消息进行其他处理)。我最初从这条管道开始,认为这将非常简单:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
.apply("Write to GCS", TextIO.write().to(gcs_bucket);
p.run();
}
我收到有关需要应用的WindowedWrites,然后需要FileNamePolicy的错误。这是毛茸茸的地方。
我去了Beam文档并签出了FilenamePolicy。看来我需要扩展此类,然后还需要扩展其他抽象类才能完成此工作。不幸的是,Apache上的文档很少,除了The Wordcount Example之外,我找不到任何针对Dataflow 2.0的示例,但ojit_a甚至在辅助类中仍使用这些细节。
因此,我很可能可以通过复制许多WordCount示例来完成此工作,但是我试图更好地理解其细节。我有几个问题:
1)是否有任何路线图项目可以抽象出这种复杂性?似乎我应该能够像在非WindowedWrite中那样提供GCS存储桶,然后仅提供一些基本选项,例如计时和文件命名规则。我知道将流式窗口数据写入文件比打开文件指针(或等效的对象存储)要复杂得多。
2)看起来要使它工作,我需要创建一个WindowedContext对象,该对象需要提供BoundedWindow抽象类和PaneInfo Object Class,然后提供一些分片信息。这些可用的信息还很少,我很难知道所有这些实际上需要什么,尤其是考虑到我的简单用例。有没有很好的例子可以实现这些目标?另外,看起来我还需要设置分片的数量作为TextIO.write的一部分,然后还要提供#个分片作为fileNamePolicy的一部分?
感谢您帮助我了解其背后的细节,希望能学到一些东西!
编辑7/20/17
因此,我最终使此管道与扩展FilenamePolicy一起运行。我的挑战是需要定义来自PubSub的流数据的窗口。这是代码的非常接近的表示形式:
public class ReadData {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("Write to GCS", TextIO.write().to("gcs_bucket")
.withWindowedWrites()
.withFilenamePolicy(new TestPolicy())
.withNumShards(10));
p.run();
}
}
class TestPolicy extends FileBasedSink.FilenamePolicy {
@Override
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
String filename = String.format(
"%s-%s-%s-%s-of-%s.json",
"test",
window.start().toString(),
window.end().toString(),
context.getShardNumber(),
context.getShardNumber()
);
return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
最佳答案
在Beam 2.0中,以下是将PubSub的原始消息写到GCS上的窗口文件中的示例。管道是相当可配置的,如果您想要数据的逻辑子部分以便于重新处理/归档,则可以通过参数和子目录策略指定窗口持续时间。请注意,这对Apache Commons Lang 3具有额外的依赖性。
PubSubToGcs
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and
* outputs the raw data into windowed files at the specified output
* directory.
*/
public class PubsubToGcs {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.</p>
*/
public static interface Options extends DataflowPipelineOptions, StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
ValueProvider<String> getTopic();
void setTopic(ValueProvider<String> value);
@Description("The directory to output files to. Must end with a slash.")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description("The filename prefix of the files to write to.")
@Default.String("output")
@Required
ValueProvider<String> getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider<String> value);
@Description("The shard template of the output file. Specified as repeating sequences "
+ "of the letters 'S' or 'N' (example: SSS-NNN). These are replaced with the "
+ "shard number, or number of shards respectively")
@Default.String("")
ValueProvider<String> getShardTemplate();
void setShardTemplate(ValueProvider<String> value);
@Description("The suffix of the files to write.")
@Default.String("")
ValueProvider<String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider<String> value);
@Description("The sub-directory policy which files will use when output per window.")
@Default.Enum("NONE")
SubDirectoryPolicy getSubDirectoryPolicy();
void setSubDirectoryPolicy(SubDirectoryPolicy value);
@Description("The window duration in which data will be written. Defaults to 5m. "
+ "Allowed formats are: "
+ "Ns (for seconds, example: 5s), "
+ "Nm (for minutes, example: 12m), "
+ "Nh (for hours, example: 2h).")
@Default.String("5m")
String getWindowDuration();
void setWindowDuration(String value);
@Description("The maximum number of output shards produced when writing.")
@Default.Integer(10)
Integer getNumShards();
void setNumShards(Integer value);
}
/**
* Main entry point for executing the pipeline.
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(Options.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/**
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
pipeline
.apply("Read PubSub Events",
PubsubIO
.readStrings()
.fromTopic(options.getTopic()))
.apply(options.getWindowDuration() + " Window",
Window
.into(FixedWindows.of(parseDuration(options.getWindowDuration()))))
.apply("Write File(s)",
TextIO
.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(options.getOutputDirectory())
.withFilenamePolicy(
new WindowedFilenamePolicy(
options.getOutputFilenamePrefix(),
options.getShardTemplate(),
options.getOutputFilenameSuffix())
.withSubDirectoryPolicy(options.getSubDirectoryPolicy())));
// Execute the pipeline and return the result.
PipelineResult result = pipeline.run();
return result;
}
/**
* Parses a duration from a period formatted string. Values
* are accepted in the following formats:
* <p>
* Ns - Seconds. Example: 5s<br>
* Nm - Minutes. Example: 13m<br>
* Nh - Hours. Example: 2h
*
* <pre>
* parseDuration(null) = NullPointerException()
* parseDuration("") = Duration.standardSeconds(0)
* parseDuration("2s") = Duration.standardSeconds(2)
* parseDuration("5m") = Duration.standardMinutes(5)
* parseDuration("3h") = Duration.standardHours(3)
* </pre>
*
* @param value The period value to parse.
* @return The {@link Duration} parsed from the supplied period string.
*/
private static Duration parseDuration(String value) {
Preconditions.checkNotNull(value, "The specified duration must be a non-null value!");
PeriodParser parser = new PeriodFormatterBuilder()
.appendSeconds().appendSuffix("s")
.appendMinutes().appendSuffix("m")
.appendHours().appendSuffix("h")
.toParser();
MutablePeriod period = new MutablePeriod();
parser.parseInto(period, value, 0, Locale.getDefault());
Duration duration = period.toDurationFrom(new DateTime(0));
return duration;
}
}
WindowedFilenamePolicy
/**
* The {@link WindowedFilenamePolicy} class will output files
* to the specified location with a format of output-yyyyMMdd'T'HHmmssZ-001-of-100.txt.
*/
@SuppressWarnings("serial")
public class WindowedFilenamePolicy extends FilenamePolicy {
/**
* Possible sub-directory creation modes.
*/
public static enum SubDirectoryPolicy {
NONE("."),
PER_HOUR("yyyy-MM-dd/HH"),
PER_DAY("yyyy-MM-dd");
private final String subDirectoryPattern;
private SubDirectoryPolicy(String subDirectoryPattern) {
this.subDirectoryPattern = subDirectoryPattern;
}
public String getSubDirectoryPattern() {
return subDirectoryPattern;
}
public String format(Instant instant) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(subDirectoryPattern);
return formatter.print(instant);
}
}
/**
* The formatter used to format the window timestamp for outputting to the filename.
*/
private static final DateTimeFormatter formatter = ISODateTimeFormat
.basicDateTimeNoMillis()
.withZone(DateTimeZone.getDefault());
/**
* The filename prefix.
*/
private final ValueProvider<String> prefix;
/**
* The filenmae suffix.
*/
private final ValueProvider<String> suffix;
/**
* The shard template used during file formatting.
*/
private final ValueProvider<String> shardTemplate;
/**
* The policy which dictates when or if sub-directories are created
* for the windowed file output.
*/
private ValueProvider<SubDirectoryPolicy> subDirectoryPolicy = StaticValueProvider.of(SubDirectoryPolicy.NONE);
/**
* Constructs a new {@link WindowedFilenamePolicy} with the
* supplied prefix used for output files.
*
* @param prefix The prefix to append to all files output by the policy.
* @param shardTemplate The template used to create uniquely named sharded files.
* @param suffix The suffix to append to all files output by the policy.
*/
public WindowedFilenamePolicy(String prefix, String shardTemplate, String suffix) {
this(StaticValueProvider.of(prefix),
StaticValueProvider.of(shardTemplate),
StaticValueProvider.of(suffix));
}
/**
* Constructs a new {@link WindowedFilenamePolicy} with the
* supplied prefix used for output files.
*
* @param prefix The prefix to append to all files output by the policy.
* @param shardTemplate The template used to create uniquely named sharded files.
* @param suffix The suffix to append to all files output by the policy.
*/
public WindowedFilenamePolicy(
ValueProvider<String> prefix,
ValueProvider<String> shardTemplate,
ValueProvider<String> suffix) {
this.prefix = prefix;
this.shardTemplate = shardTemplate;
this.suffix = suffix;
}
/**
* The subdirectory policy will create sub-directories on the
* filesystem based on the window which has fired.
*
* @param policy The subdirectory policy to apply.
* @return The filename policy instance.
*/
public WindowedFilenamePolicy withSubDirectoryPolicy(SubDirectoryPolicy policy) {
return withSubDirectoryPolicy(StaticValueProvider.of(policy));
}
/**
* The subdirectory policy will create sub-directories on the
* filesystem based on the window which has fired.
*
* @param policy The subdirectory policy to apply.
* @return The filename policy instance.
*/
public WindowedFilenamePolicy withSubDirectoryPolicy(ValueProvider<SubDirectoryPolicy> policy) {
this.subDirectoryPolicy = policy;
return this;
}
/**
* The windowed filename method will construct filenames per window in the
* format of output-yyyyMMdd'T'HHmmss-001-of-100.txt.
*/
@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext c, String extension) {
Instant windowInstant = c.getWindow().maxTimestamp();
String datetimeStr = formatter.print(windowInstant.toDateTime());
// Remove the prefix when it is null so we don't append the literal 'null'
// to the start of the filename
String filenamePrefix = prefix.get() == null ? datetimeStr : prefix.get() + "-" + datetimeStr;
String filename = DefaultFilenamePolicy.constructName(
filenamePrefix,
shardTemplate.get(),
StringUtils.defaultIfBlank(suffix.get(), extension), // Ignore the extension in favor of the suffix.
c.getShardNumber(),
c.getNumShards());
String subDirectory = subDirectoryPolicy.get().format(windowInstant);
return outputDirectory
.resolve(subDirectory, StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
/**
* Unwindowed writes are unsupported by this filename policy so an {@link UnsupportedOperationException}
* will be thrown if invoked.
*/
@Override
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) {
throw new UnsupportedOperationException("There is no windowed filename policy for unwindowed file"
+ " output. Please use the WindowedFilenamePolicy with windowed writes or switch filename policies.");
}
}