我有Java代码将JavaRDD
转换为Dataset
并将其保存到HDFS:
Dataset<User> userDataset = sqlContext.createDataset(userRdd.rdd(), Encoders.bean(User.class));
userDataset.write.json("some_path");
User
类是用Scala语言定义的:case class User(val name: Name, val address: Seq[Address]) extends Serializable
case class Name(firstName: String, lastName: Option[String])
case class Address(address: String)
代码符合并成功运行,文件保存到HDFS,而输出文件中的
User
类具有空架构:val users = spark.read.json("some_path")
users.count // 100,000 which is same as "userRdd"
users.printSchema // users: org.apache.spark.sql.DataFrame = []
为什么
Encoders.bean
在这种情况下不起作用? 最佳答案
Encoders.bean
不支持Scala case类,Encoders.product
支持。 Encoders.product
将TypeTag
作为参数,而在Java中无法初始化TypeTag
。我创建了一个Scala对象来提供TypeTag
:
import scala.reflect.runtime.universe._
object MyTypeTags {
val UserTypeTag: TypeTag[User] = typeTag[User]
}
然后在Java代码中:
Dataset<User> userDataset = sqlContext.createDataset(userRdd.rdd(), Encoders.product(MyTypeTags.UserTypeTag()));