问题描述
我目前正在通过从gcs存储桶中读取过滤信息并将其作为侧面输入传递到管道的不同阶段来创建PCollectionView,以便对输出进行过滤.如果gcs存储桶中的文件发生更改,我希望当前正在运行的管道使用此新的过滤器信息.如果我的过滤器发生更改,是否可以在每个新的数据窗口上更新此PCollectionView?我以为可以在startBundle中做到这一点,但我不知道该怎么做或是否可行.如果可能的话,请您举个例子.
I’m currently creating a PCollectionView by reading filtering information from a gcs bucket and passing it as side input to different stages of my pipeline in order to filter the output. If the file in the gcs bucket changes, I want the currently running pipeline to use this new filter info. Is there a way to update this PCollectionView on each new window of data if my filter changes? I thought I could do it in a startBundle but I can’t figure out how or if it’s possible. Could you give an example if it is possible.
PCollectionView<Map<String, TagObject>>
tagMapView =
pipeline.apply(TextIO.Read.named("TagListTextRead")
.from("gs://tag-list-bucket/tag-list.json"))
.apply(ParDo.named("TagsToTagMap").of(new Tags.BuildTagListMapFn()))
.apply("MakeTagMapView", View.asSingleton());
PCollection<String>
windowedData =
pipeline.apply(PubsubIO.Read.topic("myTopic"))
.apply(Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(15))
.every(Duration.standardSeconds(31))));
PCollection<MY_DATA>
lineData = windowedData
.apply(ParDo.named("ExtractJsonObject")
.withSideInputs(tagMapView)
.of(new ExtractJsonObjectFn()));
推荐答案
您可能想要类似使用最多1分钟版本的过滤器作为辅助输入"(从理论上讲,文件可以更改)频繁,不可预测且独立于管道的情况-因此,无法真正将文件的更改与管道的行为完全同步.
You probably want something like "use an at most a 1-minute-old version of the filter as a side input" (since in theory the file can change frequently, unpredictably, and independently from your pipeline - so there's no way really to completely synchronize changes of the file with the behavior of the pipeline).
这是我能够想到的(授予的,相当笨拙的)解决方案.它依赖于以下事实:窗口还隐式地对侧面输入进行了键控.在此解决方案中,我们将创建一个输入到1分钟固定窗口中的侧面输入,其中每个窗口将包含标签映射的单个值,该值是从该窗口中的某个时刻的过滤器文件得出的.
Here's a (granted, rather clumsy) solution I was able to come up with. It relies on the fact that side inputs are implicitly also keyed by window. In this solution we're going to create a side input windowed into 1-minute fixed windows, where each window will contain a single value of the tag map, derived from the filter file as-of some moment inside that window.
PCollection<Long> ticks = p
// Produce 1 "tick" per second
.apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Count.globally());
// Produce a collection of tag maps, 1 per each 1-minute window
PCollectionView<TagMap> tagMapView = ticks
.apply(MapElements.via((Long ignored) -> {
... manually read the json file as a TagMap ...
}))
.apply(View.asSingleton());
这种模式(与作为外部输入的缓慢变化的外部数据相结合)反复出现,而我在这里提出的解决方案远非完美,我希望我们在编程模型中对此有更好的支持.我已提交了 BEAM JIRA问题来对此进行跟踪.
This pattern (joining against slowly changing external data as a side input) is coming up repeatedly, and the solution I'm proposing here is far from perfect, I wish we had better support for this in the programming model. I've filed a BEAM JIRA issue to track this.
这篇关于可以通过读取gcs存储桶为每个窗口更新Dataflow sideInput吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!