Streaming应用程序中

Streaming应用程序中

本文介绍了如何修复"org.apache.spark.shuffle.FetchFailedException:连接失败"在NetworkWordCount Spark Streaming应用程序中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试提交示例Apache Spark Streaming应用程序:

I try submit example Apache Spark Streaming application:

/opt/spark/bin/spark-submit --class org.apache.spark.examples.streaming.NetworkWordCount --deploy-mode cluster --master yarn --driver-memory 2g --executor-memory 2g /opt/spark/examples/jars/spark-examples_2.11-2.0.0.jar 172.29.74.68 9999

作为参数,我输入主IP和本地端口(在另一个控制台中运行: nc -lk 9999 ).

As parameters I type master IP and local port (in another console is running: nc -lk 9999).

总是出现错误:

WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 50, iws1): FetchFailed(BlockManagerId(2, iws2, 41569), shuffleId=0, mapId=19, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to iws2/172.29.77.40:41569
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
    at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to iws2/172.29.77.40:41569
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more
Caused by: java.net.ConnectException: Connection refused: iws2/172.29.77.40:41569
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    ... 1 more

此处完整记录

示例"HdfsWordCount"正常工作.其他非流式传输"申请.

Example "HdfsWordCount" works correctly. Other "non-streaming" application also.

推荐答案

解决方案是将 StorageLevel.MEMORY_ONLY_SER 添加到 socketTextStream 方法中,更改 spark-defaults.conf (如下所示)并增加 yarn-site.xml 文件中的硬件资源.

The solution was to add StorageLevel.MEMORY_ONLY_SER to socketTextStream method, change spark-defaults.conf (as below) and increase hardware resources in yarn-site.xml file.

spark-defaults.conf

spark.core.connection.ack.wait.timeout 600s
spark.default.parallelism 4
spark.driver.memory 6g
spark.executor.memory 10g
spark.cores.max 4
spark.executor.cores 2

这篇关于如何修复"org.apache.spark.shuffle.FetchFailedException:连接失败"在NetworkWordCount Spark Streaming应用程序中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-24 05:10