我正在写一个小的UDF
val transform = udf((x: Array[Byte]) => {
val mapper = new ObjectMapper() with ScalaObjectMapper
val stream: InputStream = new ByteArrayInputStream(x);
val obs = new ObjectInputStream(stream)
val stock = mapper.readValue(obs, classOf[util.Hashtable[String, String]])
stock
})
我在哪里出错
java.lang.UnsupportedOperationException: Schema for type java.util.Hashtable[String,String] is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:809)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:740)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:926)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:739)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:736)
at org.apache.spark.sql.functions$.udf(functions.scala:3898)
... 59 elided
任何人都可以帮助了解为什么会这样吗?
最佳答案
您得到的错误仅表示Spark无法理解Java哈希表。我们可以使用此简单的UDF
重现您的错误。
val gen = udf(() => new java.util.Hashtable[String, String]())
Spark尝试从
DataType
创建一个java.util.Hashtable
(放入Spark模式),但它不知道如何做。 Spark可以理解scala地图。确实以下代码val gen2 = udf(() => Map("a" -> "b"))
spark.range(1).select(gen2()).show()
产量
+--------+
| UDF()|
+--------+
|[a -> b]|
+--------+
要解决第一个
UDF
和您自己的问题,您可以将Hashtable转换为scala映射。使用HashMap
可以轻松地转换JavaConverters
。我不知道使用Hashtable
的任何简便方法,但是您可以通过以下方式实现:import collection.JavaConverters._
val gen3 = udf(() => {
val table = new java.util.Hashtable[String, String]()
table.put("a", "b")
Map(table.entrySet.asScala.toSeq.map(x => x.getKey -> x.getValue) :_*)
})
关于java - Hashtable [Scala错误] [String,String],我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/59814697/