我编写了一些复杂的sparkR脚本,并使用spark-submit运行它。脚本的基本作用是逐行读取基于 hive /黑斑羚的大型拼花地板的表格,并生成具有相同行数的新拼花地板文件。
但是似乎工作在大约100分钟后停止,这似乎有些超时。

  • 对于多达50万行的脚本,它可以完美运行(因为它需要不到100分钟的时间)
  • 100分钟后退出1、2或3或更多百万行的脚本。

  • 我检查了所有已知的值,并测试了100分钟范围。但是找不到任何解决方案。
    [user@localhost R]$ time spark-submit sparkr-pre.R
    Loading required package: methods
    
    Attaching package: ‘SparkR’
    
    The following objects are masked from ‘package:stats’:
    
        filter, na.omit
    
    The following objects are masked from ‘package:base’:
    
        intersect, rbind, sample, subset, summary, table, transform
    
    15/12/30 18:04:27 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
    [Stage 1:========================================>                 (7 + 3) / 10]Error in if (returnStatus != 0) { : argument is of length zero
    Calls: write.df -> write.df -> .local -> callJMethod -> invokeJava
    Execution halted
    15/12/30 19:44:52 ERROR InsertIntoHadoopFsRelation: Aborting job.
    org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
            at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
            at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1514)
            at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
            at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1438)
            at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1724)
            at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
            at org.apache.spark.SparkContext.stop(SparkContext.scala:1723)
            at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:587)
            at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264)
            at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:234)
            at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
            at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
            at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:234)
            at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
            at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
            at scala.util.Try$.apply(Try.scala:161)
            at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:234)
            at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216)
            at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
            at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
            at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
            at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
            at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
            at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
            at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
            at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
            at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
            at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
            at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
            at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
            at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
            at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
            at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1855)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:497)
            at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)
            at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)
            at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)
            at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
            at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
            at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
            at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
            at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
            at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
            at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
            at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
            at java.lang.Thread.run(Thread.java:745)
    15/12/30 19:44:52 ERROR DefaultWriterContainer: Job job_201512301804_0000 aborted.
    15/12/30 19:44:52 ERROR RBackendHandler: save on 25 failed
    
    real    100m30.944s
    user    1m26.326s
    sys     0m19.459s
    

    环境
    运行时信息
    Name    Value
    Java Home   /usr/java/jdk1.8.0_40/jre
    Java Version    1.8.0_40 (Oracle Corporation)
    Scala Version   version 2.10.4
    Spark Properties
    
    Name    Value
    spark.akka.frameSize    1024
    spark.app.id    application_1451466100034_0019
    spark.app.name  SparkR
    spark.driver.appUIAddress   http://x.x.x.x:4040
    spark.driver.host   x.x.x.x
    spark.driver.maxResultSize  8G
    spark.driver.memory 100G
    spark.driver.port   60471
    spark.executor.id   driver
    spark.executor.memory   14G
    spark.executorEnv.LD_LIBRARY_PATH   $LD_LIBRARY_PATH:/usr/lib64/R/lib:/usr/local/lib64:/usr/lib/jvm/jre/lib/amd64/server:/usr/lib/jvm/jre/lib/amd64:/usr/lib/jvm/java/lib/amd64:/usr/java/packages/lib/amd64:/lib:/usr/lib::/usr/lib/hadoop/lib/native
    spark.externalBlockStore.folderName spark-b60f685e-c46c-435d-ab1b-c9d1279f630f
    spark.fileserver.uri    http://x.x.x.x:50281
    spark.home  /datas/spark-1.5.2-bin-hadoop2.6
    spark.kryoserializer.buffer.max 2000M
    spark.master    yarn-client
    spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS  CDHPR1.dc.dialog.lk
    spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES  http://CDHPR1.dc.dialog.lk:8088/proxy/application_1451466100034_0019
    spark.scheduler.mode    FIFO
    spark.serializer    org.apache.spark.serializer.KryoSerializer
    spark.sql.parquet.binaryAsString    true
    spark.submit.deployMode client
    spark.ui.filters    org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
    spark.yarn.dist.archives    file:/datas/spark-1.5.2-bin-hadoop2.6/R/lib/sparkr.zip#sparkr
    spark.yarn.dist.files   file:/home/inuser/R/sparkr-pre.R
    System Properties
    
    Name    Value
    SPARK_SUBMIT    true
    SPARK_YARN_MODE true
    awt.toolkit sun.awt.X11.XToolkit
    file.encoding   UTF-8
    file.encoding.pkg   sun.io
    file.separator  /
    java.awt.graphicsenv    sun.awt.X11GraphicsEnvironment
    java.awt.printerjob sun.print.PSPrinterJob
    java.class.version  52.0
    java.endorsed.dirs  /usr/java/jdk1.8.0_40/jre/lib/endorsed
    java.ext.dirs   /usr/java/jdk1.8.0_40/jre/lib/ext:/usr/java/packages/lib/ext
    java.home   /usr/java/jdk1.8.0_40/jre
    java.io.tmpdir  /tmp
    java.library.path   :/usr/lib/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
    java.runtime.name   Java(TM) SE Runtime Environment
    java.runtime.version    1.8.0_40-b26
    java.specification.name Java Platform API Specification
    java.specification.vendor   Oracle Corporation
    java.specification.version  1.8
    java.vendor Oracle Corporation
    java.vendor.url http://java.oracle.com/
    java.vendor.url.bug http://bugreport.sun.com/bugreport/
    java.version    1.8.0_40
    java.vm.info    mixed mode
    java.vm.name    Java HotSpot(TM) 64-Bit Server VM
    java.vm.specification.name  Java Virtual Machine Specification
    java.vm.specification.vendor    Oracle Corporation
    java.vm.specification.version   1.8
    java.vm.vendor  Oracle Corporation
    java.vm.version 25.40-b25
    line.separator
    os.arch amd64
    os.name Linux
    os.version  2.6.32-431.el6.x86_64
    path.separator  :
    sun.arch.data.model 64
    sun.boot.class.path /usr/java/jdk1.8.0_40/jre/lib/resources.jar:/usr/java/jdk1.8.0_40/jre/lib/rt.jar:/usr/java/jdk1.8.0_40/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_40/jre/lib/jsse.jar:/usr/java/jdk1.8.0_40/jre/lib/jce.jar:/usr/java/jdk1.8.0_40/jre/lib/charsets.jar:/usr/java/jdk1.8.0_40/jre/lib/jfr.jar:/usr/java/jdk1.8.0_40/jre/classes
    sun.boot.library.path   /usr/java/jdk1.8.0_40/jre/lib/amd64
    sun.cpu.endian  little
    sun.cpu.isalist
    sun.io.unicode.encoding UnicodeLittle
    sun.java.command    org.apache.spark.deploy.SparkSubmit sparkr-pre.R
    sun.java.launcher   SUN_STANDARD
    sun.jnu.encoding    UTF-8
    sun.management.compiler HotSpot 64-Bit Tiered Compilers
    sun.nio.ch.bugLevel
    sun.os.patch.level  unknown
    user.country    US
    user.dir    /home/user/R
    user.home   /home/user
    user.language   en
    user.name   inuser
    user.timezone   Asia/Colombo
    Classpath Entries
    
    Resource    Source
    /datas/spark-1.5.2-bin-hadoop2.6/conf/  System Classpath
    /datas/spark-1.5.2-bin-hadoop2.6/conf/yarn-conf/    System Classpath
    /datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar  System Classpath
    /datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar    System Classpath
    /datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar    System Classpath
    /datas/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar   System Classpath
    

    spark-default.conf
    # Default system properties included when running spark-submit.
    # This is useful for setting default environmental settings.
    
    # Example:
    # spark.master                     spark://master:7077
    # spark.eventLog.enabled           true
    # spark.eventLog.dir               hdfs://namenode:8021/directory
    # spark.serializer                 org.apache.spark.serializer.KryoSerializer
    # spark.driver.memory              5g
    # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
    #
    spark.master            yarn-client
    spark.serializer        org.apache.spark.serializer.KryoSerializer
    spark.driver.memory             100G
    spark.executor.memory           14G
    spark.sql.parquet.binaryAsString true
    spark.kryoserializer.buffer.max 2000M
    spark.driver.maxResultSize      8G
    spark.akka.frameSize            1024
    #spark.executor.instances       16
    

    我无法在公共(public)场合共享sparkR脚本。真的很抱歉。但是,当代码需要不到100分钟的时间才能完成时,它可以完美地工作。

    最佳答案

    这是Spark 1.6.0中的一个已知错误,请参阅:https://issues.apache.org/jira/browse/SPARK-12609。快速回顾一下SparkR代码还表明,该错误实际上是从Spark 1.4.0开始存在的。

    在他们发布补丁之前,快速而肮脏的解决方案是增加超时。如问题中所述,有问题的函数是connectBackend。该函数可以在运行时使用assignInNamespace进行修补。

    以下代码检索原始函数,然后将其包装在第二个函数中,为此我们将超时值增加到48小时。然后,原始函数将被包装器替换。

    connectBackend.orig <- getFromNamespace('connectBackend', pos='package:SparkR')
    connectBackend.patched <- function(hostname, port, timeout = 3600*48) {
       connectBackend.orig(hostname, port, timeout)
    }
    assignInNamespace("connectBackend", value=connectBackend.patched, pos='package:SparkR')
    

    加载SparkR软件包后放入此代码。

    另一种解决方案是修改SparkR代码中的超时并重新编译。有关编译说明,请参见:https://github.com/apache/spark/blob/branch-1.6/R/install-dev.sh

    关于hadoop - SparkR作业100分钟超时,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34584284/

    10-09 18:51