saveAsTextFile挂在spark

saveAsTextFile挂在spark

本文介绍了saveAsTextFile挂在spark java.io.IOException中:连接被数据帧中的对等方重置的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在spark中运行一个应用程序,该应用程序在两个数据框之间进行简单的比较.我在群集环境中作为jar文件执行.我的集群环境是94个节点集群.有两个数据集2 GB和4 GB映射到数据帧.

I am running an application in spark which do the simple diff between two data frame .I execute as jar file in my cluster environment .My cluster environment is 94 node cluster.There are two data set 2 GB and 4 GB which mapped to data frame .

对于很小的文件,我的工作很好...

My job is working fine for the very small size files ...

我个人认为saveAsTextFile在我的申请中花费了更多时间在我的群集connfig详细信息下面

I personal think saveAsTextFile takes more time in my applicationBelow my cluster connfig details

Total Vmem allocated for Containers     394.80 GB
Total Vmem allocated for Containers     394.80 GB
Total VCores allocated for Containers   36

这就是我执行火花工作的方式

This is how i run my spark job

spark-submit --queue root.queue --deploy-mode client --master yarn SparkApplication-SQL-jar-with-dependencies.jar

这是我的代码.

object TestDiff {

   def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("WordCount");

    conf.set("spark.executor.memory", "32g")
    conf.set("spark.driver.memory", "32g")
    conf.set("spark.driver.maxResultSize", "4g")

    val sc = new SparkContext(conf); //Creating spark context
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    import org.apache.spark.{ SparkConf, SparkContext }
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{ StructType, StructField, StringType, DoubleType, IntegerType }
    import org.apache.spark.sql.functions.udf

     val schema = StructType(Array(
      StructField("filler1", StringType),
      StructField("dunsnumber", StringType),
      StructField("transactionalindicator", StringType)))

    import org.apache.spark.sql.functions._

    val textRdd1 = sc.textFile("/home/cloudera/TRF/PCFP/INCR")

    val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|", -1)))
    var df1 = sqlContext.createDataFrame(rowRdd1, schema)

    val textRdd2 = sc.textFile("/home/cloudera/TRF/PCFP/MAIN")

    val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|", -1)))
    var df2 = sqlContext.createDataFrame(rowRdd2, schema)

    //Finding the diff between two if any of the columns has changed
    val diffAnyColumnDF = df1.except(df2)
    diffAnyColumnDF.rdd.coalesce(1).saveAsTextFile("Diffoutput")
}
}

花费30分钟以上,然后失败.

It takes more than 30 minutes and then it fails .

具有以下例外情况

这是日志

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more
17/09/15 11:55:01 WARN netty.NettyRpcEnv: Ignored message: HeartbeatResponse(false)
17/09/15 11:56:19 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@7fe57079,BlockManagerId(1, c755kds.int.int.com, 33507))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:491)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:520)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:520)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:520)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1818)
    at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:520)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more
17/09/15 11:56:19 WARN netty.NettyRpcEnv: Ignored message: HeartbeatResponse(false)

请建议如何调整我的火花工作?

Please suggest how to tune my spark job ?

我只是更改了执行程序的内存,它的工作获得了成功,但是速度非常慢.

I just changed executor memory and it job got succeeded but it is very very slow .

conf.set("spark.executor.memory", "64g")

但是工作非常缓慢...大约需要15分钟才能完成..

But job is very slow ...It takes around 15 minutes to complete ..

工作完成了15分钟.

附加DAG可视化

在增加超时时间后,conf低于错误..

After increasing the time out conf getting below error ..

 executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 175200 ms

推荐答案

我认为您的单个分区文件很大.通过TCP通道传输数据需要花费很长时间,并且连接不能长时间保持活动状态并被重置.

I think your single partitiond file size is big. it takes long time to stream the data over the TCP channel and the connection can not be made alive for a long time and gets reset.

您可以合并到更大数量的分区吗?

Can you coalesce to a higher number of partitions?

这篇关于saveAsTextFile挂在spark java.io.IOException中:连接被数据帧中的对等方重置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 11:51