我是SPARK的新手,并想出一种更好的方法来实现以下方案。
有一个包含3个字段的数据库表-类别,金额,数量。
首先,我尝试从数据库中提取所有不同的类别。

 val categories:RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString)

现在,对于每个类别,我要执行管道,该管道实质上从每个类别创建数据帧并应用一些机器学习。
 categories.foreach(executePipeline)
 def execute(category: String): Unit = {
   val dfCategory = sqlCtxt.read.jdbc(JDBC_URL,"SELECT * FROM TABLE_NAME WHERE CATEGORY="+category)
dfCategory.show()
}

可以做这样的事情吗?还是有更好的选择?

最佳答案

// You could get all your data with a single query and convert it to an rdd
val data = sqlCtxt.read.jdbc(JDBC_URL,"SELECT * FROM TABLE_NAME).rdd

// then group the data by category
val groupedData = data.groupBy(row => row.getAs[String]("category"))

// then you get an RDD[(String, Iterable[org.apache.spark.sql.Row])]
// and you can iterate over it and execute your pipeline
groupedData.map { case (categoryName, items) =>
  //executePipeline(categoryName, items)
}

08-27 23:48
查看更多