我是Spark和Scala的新手。我正在尝试将函数称为Spark UDF,但遇到了我似乎无法解决的错误。

我了解在Scala中,Array和Seq并不相同。 WrappedArray是Seq的子类型,并且WrappedArray和Array之间存在隐式转换,但是我不确定为什么在UDF中不会发生这种情况。

非常感谢您能帮助我理解和解决此问题的任何指示。

这是代码片段

def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = {
val seqToArray = a.toArray
val s = seqToArray.toSet
m filterKeys s
}

val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a))

case class myType(id: Int, m: Map[Int, Int])
val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400)))
val mapDF = mapRDD.toDF

mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>]
root
 |-- id: integer (nullable = false)
 |-- m: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)

case class myType2(id: Int, a: Array[Int])
val idRDD = Seq(myType2(1, Array(1,2,100,200)), myType2(2, Array(100,200)), myType2(3, Array(1,2)) )
val idDF = idRDD.toDF

idDF: org.apache.spark.sql.DataFrame = [id: int, a: array<int>]
root
 |-- id: integer (nullable = false)
 |-- a: array (nullable = true)
 |    |-- element: integer (containsNull = false)

import sqlContext.implicits._
/* Hive context is exposed as sqlContext */

val j = mapDF.join(idDF, idDF("id") === mapDF("id")).drop(idDF("id"))
val k = j.withColumn("filteredMap",myUDF(j("m"), j("a")))
k.show

查看数据框“j”和“k”,映射和数组列具有正确的数据类型。
j: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>]
root
 |-- id: integer (nullable = false)
 |-- m: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)
 |-- a: array (nullable = true)
 |    |-- element: integer (containsNull = false)

k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>]
root
 |-- id: integer (nullable = false)
 |-- m: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)
 |-- a: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- filteredMap: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)

但是,对数据框“k”的调用UDF的操作失败,并显示以下错误-
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, ip-100-74-42-194.ec2.internal): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

最佳答案

在函数filterMapKeysWithSet中将数据类型从Array [Int]更改为Seq [Int]似乎可以解决上述问题。

def filterMapKeysWithSet(m: Map[Int, Int], a: Seq[Int]): Map[Int, Int] = {

    val seqToArray = a.toArray

    val s = seqToArray.toSet

    m filterKeys s

  }

val myUDF = udf((m: Map[Int, Int], a: Seq[Int]) => filterMapKeysWithSet(m, a))

k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>]
root
 |-- id: integer (nullable = false)
 |-- m: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)
 |-- a: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- filteredMap: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)

+---+--------------------+----------------+--------------------+
| id|                   m|               a|         filteredMap|
+---+--------------------+----------------+--------------------+
|  1|Map(1 -> 100, 2 -...|[1, 2, 100, 200]|Map(1 -> 100, 2 -...|
|  2|Map(1 -> 100, 2 -...|      [100, 200]|               Map()|
|  3|Map(3 -> 300, 4 -...|          [1, 2]|               Map()|
+---+--------------------+----------------+--------------------+

因此,看起来数据框“idDF”上的ArrayType确实是WrappedArray而不是Array-因此,对“filterMapKeysWithSet”的函数调用失败,因为它期望使用Array,但却获得了WrappedArray / Seq(不会隐式转换为Array)在Scala 2.8及更高版本中)。

关于apache-spark - 无法将scala.collection.mutable.WrappedArray $ ofRef强制转换为Integer,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40199507/

10-13 06:54