根据对Convert Spark DataFrame to Pojo Object的答复,我了解到Dataframe
是Dataset<Row>
的别名。
我目前计算了一个JavaPairRDD<CityCode, CityStatistics>
,其中CityStatistics
是一个POJO,其中包含诸如以下成员的getter和setter:getCityCode()
,getCityName()
,getActivityCode()
,getNumberOfSalaried()
,getNumberOfCompanies()
...Liquibase
脚本创建了一个统计表,其中存在这些字段(CITYCODE
,CITYNAME
,ACTIVITYCODE
...)。我只需要写记录。
从我的JavaPairRDD<CityCode, CityStatistics> citiesStatisticsRDD
做类似的事情的(或之前有什么)干净的方法是什么?citiesStatisticsRDD.values()
=> DataSet<CityStatistics>
=> DataSet<Row> (= DataFrame)
=>通过数据帧方法在JDBC连接上写?
谢谢 !
最佳答案
首先,您必须将JavaPairRDD转换为RDD,因为.createDataset() accepts RDD<T> not JavaRDD<T>
。JavaRDD
是RDD的包装,以便于从Java代码进行调用。它内部包含RDD,可以使用.rdd()访问
JavaRDD cityRDD = citiesStatisticsRDD.map(x -> x._2);
Dataset<CityStatistics> cityDS = sqlContext.createDataset(cityRDD.rdd(), Encoders.bean(CityStatistics.class))
现在,如果您要将整个cityStatisticsRDD转换为数据集:将JavaPairRDD转换为RDD,然后使用编码器
Dataset<Row> cityDS = sqlContext.createDataset(citiesStatisticsRDD.values().rdd(), Encoders.bean(CityStatistics.class)).toDF();