问题描述
我正在spark中运行一个应用程序,该应用程序在两个数据框之间进行简单的比较.我在群集环境中作为jar文件执行.我的集群环境是94个节点集群.有两个数据集2 GB和4 GB映射到数据帧.
I am running an application in spark which do the simple diff between two data frame .I execute as jar file in my cluster environment .My cluster environment is 94 node cluster.There are two data set 2 GB and 4 GB which mapped to data frame .
对于很小的文件,我的工作很好...
My job is working fine for the very small size files ...
我个人认为saveAsTextFile
在我的申请中花费了更多时间在我的群集connfig详细信息下面
I personal think saveAsTextFile
takes more time in my applicationBelow my cluster connfig details
Total Vmem allocated for Containers 394.80 GB
Total Vmem allocated for Containers 394.80 GB
Total VCores allocated for Containers 36
这就是我执行火花工作的方式
This is how i run my spark job
spark-submit --queue root.queue --deploy-mode client --master yarn SparkApplication-SQL-jar-with-dependencies.jar
这是我的代码.
object TestDiff {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount");
conf.set("spark.executor.memory", "32g")
conf.set("spark.driver.memory", "32g")
conf.set("spark.driver.maxResultSize", "4g")
val sc = new SparkContext(conf); //Creating spark context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ StructType, StructField, StringType, DoubleType, IntegerType }
import org.apache.spark.sql.functions.udf
val schema = StructType(Array(
StructField("filler1", StringType),
StructField("dunsnumber", StringType),
StructField("transactionalindicator", StringType)))
import org.apache.spark.sql.functions._
val textRdd1 = sc.textFile("/home/cloudera/TRF/PCFP/INCR")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
val textRdd2 = sc.textFile("/home/cloudera/TRF/PCFP/MAIN")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
//Finding the diff between two if any of the columns has changed
val diffAnyColumnDF = df1.except(df2)
diffAnyColumnDF.rdd.coalesce(1).saveAsTextFile("Diffoutput")
}
}
花费30分钟以上,然后失败.
It takes more than 30 minutes and then it fails .
具有以下例外情况
这是日志
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
... 14 more
17/09/15 11:55:01 WARN netty.NettyRpcEnv: Ignored message: HeartbeatResponse(false)
17/09/15 11:56:19 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@7fe57079,BlockManagerId(1, c755kds.int.int.com, 33507))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:491)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:520)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:520)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:520)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1818)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:520)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
... 14 more
17/09/15 11:56:19 WARN netty.NettyRpcEnv: Ignored message: HeartbeatResponse(false)
请建议如何调整我的火花工作?
Please suggest how to tune my spark job ?
我只是更改了执行程序的内存,它的工作获得了成功,但是速度非常慢.
I just changed executor memory and it job got succeeded but it is very very slow .
conf.set("spark.executor.memory", "64g")
但是工作非常缓慢...大约需要15分钟才能完成..
But job is very slow ...It takes around 15 minutes to complete ..
工作完成了15分钟.
附加DAG可视化
在增加超时时间后,conf低于错误..
After increasing the time out conf getting below error ..
executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 175200 ms
推荐答案
我认为您的单个分区文件很大.通过TCP通道传输数据需要花费很长时间,并且连接不能长时间保持活动状态并被重置.
I think your single partitiond file size is big. it takes long time to stream the data over the TCP channel and the connection can not be made alive for a long time and gets reset.
您可以合并到更大数量的分区吗?
Can you coalesce to a higher number of partitions?
这篇关于saveAsTextFile挂在spark java.io.IOException中:连接被数据帧中的对等方重置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!