获取侧面输入时出错

获取侧面输入时出错

本文介绍了谷歌云流数据流:获取侧面输入时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有时我在运行流数据流时遇到以下异常:

Sometimes I am getting below exception while running streaming dataflow :

  exception:  "java.lang.RuntimeException: Exception while fetching side input:
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher.fetchSideInput(StateFetcher.java:184)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:175)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext.access$400(StreamingModeExecutionContext.java:56)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:401)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:135)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:175)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:117)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.startBundle(ForwardingParDoFn.java:36)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.start(ParDoOperation.java:45)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:719)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.access$600(StreamingDataflowWorker.java:95)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:538)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Duplicate values for 2059
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2207)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4790)
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher.fetchSideInput(StateFetcher.java:175)
... 16 more
Caused by: java.lang.IllegalArgumentException: Duplicate values for 2059
at com.google.cloud.dataflow.sdk.util.PCollectionViews$MapPCollectionView.fromElements(PCollectionViews.java:291)
at com.google.cloud.dataflow.sdk.util.PCollectionViews$MapPCollectionView.fromElements(PCollectionViews.java:273)
at com.google.cloud.dataflow.sdk.util.PCollectionViews$PCollectionViewBase.fromIterableInternal(PCollectionViews.java:368)
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher$2.call(StateFetcher.java:152)
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher$2.call(StateFetcher.java:104)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4793)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
... 19 more

数据流工作机类型:n1-standard-4

Dataflow worker machine type : n1-standard-4

工作缓存Mb:2048

Worker cache memory Mb : 2048

数据流主要输入:PubSub订阅.我正在从BT创建sideInput,并将此sideInputs传递给多个转换.我的sideinput的大小小于100Mb.

Dataflow main Input : PubSub subscription.I am creating sideInput from BT , and passing this sideInputs to multiple transformations.The size of my sideinput is less that 100Mb.

谢谢.

推荐答案

该错误表示遇到了多个具有相同键(2059)的值,这违反了对Map值的边输入的期望.在流式传输中可能会发生这种情况,特别是如果您多次触发相同的值.如果您改用Multimap,则它应允许您检索与给定键关联的所有值.

That error indicates that multiple values with the same key (2059) have been encountered, which violates the expectations for a Map-valued side input. This can happen in streaming especially if you trigger the same value multiple times. If you instead use a Multimap, it should allow you to retrieve all of the values associated with a given key.

这篇关于谷歌云流数据流:获取侧面输入时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 00:43