本文介绍了Spark 2.2 无法将 df 写入 parquet的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建一个聚类算法,我需要存储模型以备将来加载.我有一个带有此架构的数据框:

I'm building a clustering algorithm and I need to store the model for future loading. I have a dataframe with this schema:

val schema = new StructType()
        .add(StructField("uniqueId", LongType))
        .add(StructField("timestamp", LongType))
        .add(StructField("pt", ArrayType(DoubleType)))
        .add(StructField("norm", DoubleType))
        .add(StructField("kNN", ArrayType(LongType)))
        .add(StructField("kDist", DoubleType))
        .add(StructField("lrd", DoubleType))
        .add(StructField("lof", DoubleType))
        .add(StructField("isClusterCenter", BooleanType))
        .add(StructField("clusterSize", DoubleType))
        .add(StructField("clusterId", IntegerType))

我正在使用 parquet() 方法来编写 parquet 文件:

I'm using parquet() method to write the parquet file:

df.write.mode(SaveMode.Overwrite).parquet(Loader.dataPath("/tmp/milof/model"))

我已经打印了数据框,看起来不错

I've printed the dataframe and it looks good

+--------+-------------+--------------------+------------------+------------+-------+--------------------+-------------------+---------------+-----------+---------+
|uniqueId|    timestamp|                  pt|              norm|         kNN|  kDist|                 lrd|                lof|isClusterCenter|clusterSize|clusterId|
+--------+-------------+--------------------+------------------+------------+-------+--------------------+-------------------+---------------+-----------+---------+
|       1|1516459162000|[14.0, 78.0, 52.0...|219.61784991206886|[2, 3, 5, 4]|54363.0|4.950813666226044E-5| 0.3926170684395501|          false|        5.0|        1|

但是当我到达上述行时,我收到以下错误:

but when I reach the above line I get the following error:

Exception in thread "main" org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:508)
    at it.gagliano.giuseppe.spark.clustering.milof.MiLOFModel$SaveLoadV1_0$.save(MiLOFModel.scala:593)
    at it.gagliano.giuseppe.spark.clustering.milof.MiLOFModel.save(MiLOFModel.scala:364)
    at it.gagliano.giuseppe.spark.clustering.milof.KafkaTrainer$.main(KafkaTrainer.scala:91)
    at it.gagliano.giuseppe.spark.clustering.milof.KafkaTrainer.main(KafkaTrainer.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 60.0 failed 1 times, most recent failure: Lost task 0.0 in stage 60.0 (TID 77, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
    at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:108)
    at org.apache.spark.sql.types.StructType$$anonfun$6.apply(StructType.scala:414)
    at org.apache.spark.sql.types.StructType$$anonfun$6.apply(StructType.scala:414)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.types.StructType$.fromString(StructType.scala:414)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.init(ParquetWriteSupport.scala:80)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:341)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:303)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:312)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
    ... 8 more

谁能解释一下这是什么意思?我怀疑 DataFrameWriter 不支持某些类型,但我在 Internet 上没有找到任何关于此的信息.任何建议将不胜感激.谢谢.

Can somebody explain what this means? My suspect is that some types are not supported to DataFrameWriter but I didn't find anything on the Internet about this.Any suggestion will be appreciated. Thanks.

版本

Spark 2.2.1
Scala 2.11.11
Json4S 'org.json4s', name: 'json4s-jackson_2.11', version: '3.6.0-M2'

推荐答案

切换到以前版本的 json4s 依赖项有效,我使用了以下内容

Switching to previous version of json4s dependency worked, I've used the following

<dependency>
    <groupId>org.json4s</groupId>
    <artifactId>json4s-jackson_2.11</artifactId>
    <version>3.2.11</version>
</dependency>

这篇关于Spark 2.2 无法将 df 写入 parquet的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-01 04:38