当我使用数据流通过以下步骤时:
-从bigquery读取
-将表行转换为json字符串
-插入elasticsearch(7.5.2)
在大约100k条记录下看起来效果很好,但是在实际情况下(8m条记录〜65gb),插入300k条数据流后会引发异常。
Error message from worker: java.lang.RuntimeException: unexpected org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:104) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.fillEntries(BatchingShuffleEntryReader.java:125) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.fillEntriesIfNeeded(BatchingShuffleEntryReader.java:119) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.hasNext(BatchingShuffleEntryReader.java:84) org.apache.beam.runners.dataflow.worker.util.common.ForwardingReiterator.hasNext(ForwardingReiterator.java:63) org.apache.beam.runners.dataflow.worker.util.common.worker.GroupingShuffleEntryIterator.advance(GroupingShuffleEntryIterator.java:109) org.apache.beam.runners.dataflow.worker.GroupingShuffleReader$GroupingShuffleReaderIterator.advance(GroupingShuffleReader.java:272) org.apache.beam.runners.dataflow.worker.GroupingShuffleReader$GroupingShuffleReaderIterator.start(GroupingShuffleReader.java:266) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out when customerdataworker-khanhn-02170116-o318-harness-8pqr talking to customerdataworker-khanhn-02170116-o318-harness-f8zt:12346. Server unresponsive (ping error: Deadline Exceeded, {"created":"@1581934551.886578453","description":"Deadline Exceeded","file":"third_party/grpc/src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}). Typically one can self manage this issue, please read: https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:531) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:492) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:196) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2312) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:101) ... 21 more Caused by: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out when customerdataworker-khanhn-02170116-o318-harness-8pqr talking to customerdataworker-khanhn-02170116-o318-harness-f8zt:12346. Server unresponsive (ping error: Deadline Exceeded, {"created":"@1581934551.886578453","description":"Deadline Exceeded","file":"third_party/grpc/src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}). Typically one can self manage this issue, please read: https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout org.apache.beam.runners.dataflow.worker.ApplianceShuffleReader.readIncludingPosition(Native Method) org.apache.beam.runners.dataflow.worker.ChunkingShuffleBatchReader.read(ChunkingShuffleBatchReader.java:58) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader$1.load(CachingShuffleBatchReader.java:70) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader$1.load(CachingShuffleBatchReader.java:66) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) ... 27 more
我还使用以下配置作为建议(增加磁盘大小,工作人员数量):https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout,但它似乎仍然无法正常工作。
我当前的配置:
--runner=DataflowRunner \
--numWorkers=10 \
--maxNumWorkers=20 \
--diskSizeGb=150 \
--workerMachineType=n1-standard-1 \
--region=asia-east1" && \
-更新1:
登录堆栈驱动程序enter image description here
最佳答案
感谢您的答复,我已经解决了问题。根本原因是由数据流创建的VM实例具有防火墙规则,我们应该允许它们之间的连接。
关于elasticsearch - 使用数据流时获得意外的异常,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/60260552/