根据对Convert Spark DataFrame to Pojo Object的答复,我了解到DataframeDataset<Row>的别名。

我目前计算了一个JavaPairRDD<CityCode, CityStatistics>,其中CityStatistics是一个POJO,其中包含诸如以下成员的getter和setter:getCityCode()getCityName()getActivityCode()getNumberOfSalaried()getNumberOfCompanies() ...

Liquibase脚本创建了一个统计表,其中存在这些字段(CITYCODECITYNAMEACTIVITYCODE ...)。我只需要写记录。

从我的JavaPairRDD<CityCode, CityStatistics> citiesStatisticsRDD做类似的事情的(或之前有什么)干净的方法是什么?
citiesStatisticsRDD.values() => DataSet<CityStatistics> => DataSet<Row> (= DataFrame) =>通过数据帧方法在JDBC连接上写?

谢谢 !

最佳答案

首先,您必须将JavaPairRDD转换为RDD,因为.createDataset() accepts RDD<T> not JavaRDD<T>JavaRDD是RDD的包装,以便于从Java代码进行调用。它内部包含RDD,可以使用.rdd()访问

JavaRDD cityRDD = citiesStatisticsRDD.map(x -> x._2);
Dataset<CityStatistics> cityDS =  sqlContext.createDataset(cityRDD.rdd(), Encoders.bean(CityStatistics.class))


现在,如果您要将整个cityStatisticsRDD转换为数据集:将JavaPairRDD转换为RDD,然后使用编码器

Dataset<Row> cityDS = sqlContext.createDataset(citiesStatisticsRDD.values().rdd(), Encoders.bean(CityStatistics.class)).toDF();

08-27 10:38