问题描述
这里有新的Spark用户。我正在从AWS S3上存储的许多.tif图像中提取特征,每个图像都具有02_R4_C7等标识符。我正在使用Spark 2.2.1和hadoop 2.7.2。
New Spark user here. I'm extracting features from many .tif images stored on AWS S3, each with identifier like 02_R4_C7. I'm using Spark 2.2.1 and hadoop 2.7.2.
我正在使用所有默认配置:
I'm using all default configurations like so:
conf = SparkConf().setAppName("Feature Extraction")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
这是一个函数调用,在某些功能成功保存在image-xxxx的映像id文件夹中后失败.gz文件:
And here is the function call that this fails on after some features are successfully saved in an image id folder as part-xxxx.gz files:
features_labels_rdd.saveAsTextFile(text_rdd_direct,org.apache.hadoop.io.compress.GzipCodec)
请参阅下面的错误。当我删除已成功创建的功能part-xxxx.gz文件并重新运行脚本时,它会以不同的图像和part-xxxxx.gz以看似不确定的方式失败。我确保在重新运行之前删除所有功能。我的理论是,两个工作者正在尝试创建相同的临时文件并且彼此冲突,因为同一文件有两个相同的错误消息,但相隔一秒。
See error below. When I delete the feature part-xxxx.gz files that were successfully created and rerun the script, it fails at a different image and part-xxxxx.gz in a seemingly nondeterminsitic way. I make sure to remove all features before rerunning. My theory is that two workers are trying to create the same temp file and are conflicting with each other, since there are two identical error messages for the same file, but one second apart.
我对此怎么办感到茫然,我看到了火花清单可以改变spark处理任务的方式,但我不确定这会有什么帮助,因为我不理解我遇到的问题。非常感谢任何帮助!
I'm at a loss about what to do about this, I've seen that spark lists configurations that can change how spark handles tasks but I'm not sure what would help here since I don't understand the issue I'm having. Any help is greatly appreciated!
SLF4J: Class path contains multiple SLF4J bindings.
*SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-
log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/26 19:24:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/26 19:24:41 WARN spark.SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
n images = 512
Feature file of 02_R4_C7 is created
[Stage 3:=================> (6 + 14) / 20]18/06/26 19:24:58 ERROR mapred.SparkHadoopMapRedUtil: Error committing the output of task: attempt_20180626192453_0003_m_000007_59
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:172)
at org.apache.hadoop.mapred.OutputCommitter.commitTask(OutputCommitter.java:343)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
at org.apache.spark.internal.io.SparkHadoopWriter.commit(SparkHadoopWriter.scala:105)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1146)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[Stage 3:=====================================> (13 + 7) / 20]18/06/26 19:24:58 ERROR executor.Executor: Exception in task 7.0 in stage 3.0 (TID 59)
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:172)
at org.apache.hadoop.mapred.OutputCommitter.commitTask(OutputCommitter.java:343)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
at org.apache.spark.internal.io.SparkHadoopWriter.commit(SparkHadoopWriter.scala:105)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1146)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/06/26 19:24:58 ERROR scheduler.TaskSetManager: Task 7 in stage 3.0 failed 1 times; aborting job
Traceback (most recent call last):
File "run_feature_extraction_spark.py", line 88, in <module>
main(sc)
File "run_feature_extraction_spark.py", line 75, in main
features_labels_rdd.saveAsTextFile(text_rdd_direct, "org.apache.hadoop.io.compress.GzipCodec")
File "/home/ubuntu/.local/lib/python2.7/site-packages/pyspark/rdd.py", line 1551, in saveAsTextFile
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
File "/home/ubuntu/.local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/ubuntu/.local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/ubuntu/.local/lib/python2.7/site-packages/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o76.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 3.0 failed 1 times, most recent failure: Lost task 7.0 in stage 3.0 (TID 59, localhost, executor driver): java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz*
当我再次运行它时,脚本使它更远,但是使用不同的图像文件夹和part-xxxx.gz文件失败并出现相同的错误
And when I run it again, the script makes it farther but fails with the same error with a different image folder and part-xxxx.gz file
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/26 19:37:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/26 19:37:24 WARN spark.SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
n images = 512
Feature file of 02_R4_C7 is created
Feature file of 02_R4_C6 is created
Feature file of 02_R4_C5 is created
Feature file of 02_R4_C4 is created
Feature file of 02_R4_C3 is created
Feature file of 02_R4_C2 is created
Feature file of 02_R4_C1 is created
[Stage 15:==========================================> (15 + 5) / 20]18/06/26 19:38:16 ERROR mapred.SparkHadoopMapRedUtil: Error committing the output of task: attempt_20180626193811_0015_m_000017_285
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C0/_temporary/0/_temporary/attempt_20180626193811_0015_m_000017_285/part-00017.gz; isDirectory=false; length=896020; replication=1; blocksize=67108864; modification_time=1530041897000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C0/part-00017.gz
推荐答案
如果没有一致性层(一致的EMR,或来自Apache Hadoop项目本身,S3Guard),或者使用S3作为直接工作目的地是不安全的专门为S3工作而设计的特殊输出提交器(Hadoop 3.1+S3A提交者)。重命名是事情失败的地方,因为列表不一致意味着扫描要复制的文件可能会丢失数据,或者找到无法重命名的已删除文件。您的堆栈跟踪看起来完全是我预期的结果:作业提交显然随机失败。
It's not safe to use S3 as a direct destination of work without a "consistency layer" (Consistent EMR, or from the Apache Hadoop project itself, S3Guard), or a Special output committer designed explicitly for work with S3 (Hadoop 3.1+ "the S3A committers"). Rename is where things fail, as listing inconsistency means that the scan for files to copy may miss data, or find deleted files which it can't rename. Your stack trace looks exactly how I'd expect this to surface: job commits failing apparently at random.
这里有一段
解决方法:写入本地群集FS然后使用distcp上传到S3。
Workaround: write to your local cluster FS then use distcp to upload to S3.
PS:对于Hadoop 2.7+,切换到s3a://连接器。它没有启用S3Guard时具有完全相同的一致性问题,但性能更好。
PS: for Hadoop 2.7+, switch to the s3a:// connector. It has exactly the same consistency problem without S3Guard enabled, but better performance.
这篇关于Spark抛出java.io.IOException:保存part-xxxxx.gz时无法重命名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!