我一直在尝试将RDD转换为数据框。为此,需要定义类型,而不是任何类型。我正在使用spark MLLib PrefixSpan,那是freqSequence.sequence的来源。我从一个包含Session_ID,视图和购买的字符串数组的数据帧开始:

viewsPurchasesGrouped: org.apache.spark.sql.DataFrame =
  [session_id: decimal(29,0), view_product_ids: array[string], purchase_product_ids: array[string]]


然后,我计算频繁使用的模式,并在数据帧中需要它们,以便可以将它们写入Hive表。

val viewsPurchasesRddString = viewsPurchasesGrouped.map( row => Array(Array(row(1)), Array(row(2)) ))

val prefixSpan = new PrefixSpan()
  .setMinSupport(0.001)
  .setMaxPatternLength(2)

val model = prefixSpan.run(viewsPurchasesRddString)

val freqSequencesRdd = sc.parallelize(model.freqSequences.collect())

case class FreqSequences(views: Array[String], purchases: Array[String], support: Long)

val viewsPurchasesDf = freqSequencesRdd.map( fs =>
  {
  val views = fs.sequence(0)(0)
  val purchases = fs.sequence(1)(0)
  val freq = fs.freq
  FreqSequences(views, purchases, freq)
  }
)
viewsPurchasesDf.toDF() // optional


当我尝试运行此命令时,视图和购买是“任何”而不是“ Array [String]”。我拼命地尝试将它们转换,但是我得到的最好的是Array [Any]。我想我需要将内容映射到字符串,我已经尝试过例如这个:How to get an element in WrappedArray: result of Dataset.select("x").collect()?和这个:How to cast a WrappedArray[WrappedArray[Float]] to Array[Array[Float]] in spark (scala)以及其他成千上万个Stackoverflow问题...

我真的不知道该怎么解决。我想我已经将初始数据帧/ RDD转换了很多,但是不知道在哪里。

最佳答案

我解决了问题。供参考,此方法有效:

val viewsPurchasesRddString = viewsPurchasesGrouped.map( row =>
  Array(
  row.getSeq[Long](1).toArray,
  row.getSeq[Long](2).toArray
  )
)

val prefixSpan = new PrefixSpan()
  .setMinSupport(0.001)
  .setMaxPatternLength(2)

val model = prefixSpan.run(viewsPurchasesRddString)

case class FreqSequences(views: Long, purchases: Long, frequence: Long)

val ps_frequences = model.freqSequences.filter(fs => fs.sequence.length > 1).map( fs =>
    {
    val views = fs.sequence(0)(0)
    val purchases = fs.sequence(1)(0)
    val freq = fs.freq
    FreqSequences(views, purchases, freq)
    }
)

ps_frequences.toDF()

09-28 08:52