问题描述
我正在手动创建一个数据框以进行一些测试.创建它的代码是:
I'm manually creating a dataframe for some testing. The code to create it is:
case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
.createDataFrame(List(input(1110,0,1001,-10.00),
input(1111,1,1001,10.00),
input(1111,0,1002,10.00)))
所以架构看起来像这样:
So the schema looks like this:
root
|-- id: long (nullable = false)
|-- var1: integer (nullable = false)
|-- var2: integer (nullable = false)
|-- var3: double (nullable = false)
我想为这些变量中的每一个设置'nullable = true'.如何从一开始就声明它,或者在创建之后将其切换到新的数据框中?
I want to make 'nullable = true' for each one of these variable. How do I declare that from the start or switch it in a new dataframe after it's been created?
推荐答案
答案
与进口
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
您可以使用
/**
* Set nullable property of column.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either nullable or not
*/
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
case y: StructField => y
})
// apply new schema
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
直接.
您还可以通过"pimp my library"库模式使该方法可用(请参阅我的SO帖子),这样您就可以调用
Also you can make the method available via the "pimp my library" library pattern ( see my SO post What is the best way to define custom methods on a DataFrame? ), such that you can call
val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )
编辑
替代解决方案1
使用setNullableStateOfColumn
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
})
// apply new schema
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
替代解决方案2
明确定义架构. (使用反射来创建更通用的解决方案)
Alternative solution 2
Explicitely define the schema. (Use reflection to create a solution that is more general)
configuredUnitTest("Stackoverflow.") { sparkContext =>
case class Input(id:Long, var1:Int, var2:Int, var3:Double)
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
// use this to set the schema explicitly or
// use refelection on the case class member to construct the schema
val schema = StructType( Seq (
StructField( "id", LongType, true),
StructField( "var1", IntegerType, true),
StructField( "var2", IntegerType, true),
StructField( "var3", DoubleType, true)
))
val is: List[Input] = List(
Input(1110, 0, 1001,-10.00),
Input(1111, 1, 1001, 10.00),
Input(1111, 0, 1002, 10.00)
)
val rdd: RDD[Input] = sparkContext.parallelize( is )
val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
val inputDF = sqlContext.createDataFrame( rowRDD, schema )
inputDF.printSchema
inputDF.show()
}
这篇关于更改Spark数据框中的列的可为空属性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!