ClosedChannelException

ClosedChannelException

我有一个 Spark 应用。我使用saveAsNewAPIHadoopDataset使用AvroKeyOutputFormat将rdd存储在hdfs上。

对于大型RDD,有时我会收到太多ClosedChannelException,以至该应用程序最终中止。

我在某处读到设置hadoopConf.set("fs.hdfs.impl.disable.cache", "false");有帮助。

这是我保存rdd的方法:

        hadoopConf.set("fs.hdfs.impl.disable.cache", "false");
        final Job job = Job.getInstance(hadoopConf);
        FileOutputFormat.setOutputPath(job, outPutPath);
        AvroJob.setOutputKeySchema(job, MyClass.SCHEMA$);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);

        rdd
                .mapToPair(new PreparePairForDatnum())
                .saveAsNewAPIHadoopDataset(job.getConfiguration());

这是堆栈跟踪:
java.nio.channels.ClosedChannelException
    at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1765)
    at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:108)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:458)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
    at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
    at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
    at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:369)
    at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:395)
    at org.apache.avro.file.DataFileWriter.writeIfBlockFull(DataFileWriter.java:340)
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:311)
    at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
    at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1036)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1042)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1014)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Suppressed: java.nio.channels.ClosedChannelException
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1765)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:108)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:458)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
        at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
        at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
        at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:369)
        at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:395)
        at org.apache.avro.file.DataFileWriter.sync(DataFileWriter.java:413)
        at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:422)
        at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:445)
        at org.apache.avro.mapreduce.AvroKeyRecordWriter.close(AvroKeyRecordWriter.java:83)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1043)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1215)
        ... 8 more

最佳答案

遗嘱执行人被杀时可能会发生。在日志中查找类似的内容:

2016-07-20 22:00:42,976 | WARN  | org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint | Container container_e10838_1468831508103_1724_01_055482 on host: hostName was preempted.
2016-07-20 22:00:42,977 | ERROR | org.apache.spark.scheduler.cluster.YarnClusterScheduler | Lost executor 6 on hostName: Container container_e10838_1468831508103_1724_01_055482 on host: hostName was preempted.

如果找到,则 yarn 应用程序主管会抢占您的任务执行者。换句话说,他被杀并被赋予另一个奔跑队列。关于抢占和 yarn 调度,可以找到herehere

关于java - 由于ClosedChannelException(DFSOutputStream.checkClosed)而导致Spark作业失败,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37733004/

10-11 10:55