我试着在spark中运行fpgrowth算法的一个例子,但是,我遇到了一个错误。这是我的代码:
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}
val transactions: RDD[Array[String]] = sc.textFile("path/transations.txt").map(_.split(" ")).cache()
val fpg = new FPGrowth().setMinSupport(0.2).setNumPartitions(10)
val model = fpg.run(transactions)
model.freqItemsets.collect().foreach { itemset => println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)}
代码一直运行到我得到错误的最后一行:
WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 16, ip-10-0-0-###.us-west-1.compute.internal):
com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Can not set
final scala.collection.mutable.ListBuffer field org.apache.spark.mllib.fpm.FPTree$Summary.nodes to scala.collection.mutable.ArrayBuffer
Serialization trace:
nodes (org.apache.spark.mllib.fpm.FPTree$Summary)
我甚至尝试使用这里提出的解决方案:
SPARK-7483
我也没有运气。
有人找到解决办法了吗?或者有人知道如何查看结果或将结果保存到文本文件中吗?
任何帮助都将不胜感激!
我还找到了这个算法的完整源代码-
http://mail-archives.apache.org/mod_mbox/spark-commits/201502.mbox/%3C1cfe817dfdbf47e3bbb657ab343dcf82@git.apache.org%3E
最佳答案
我得到了同样的错误:这是因为火花版本。在spark 1.5.2中,这是固定的,但是我使用的是1.3。我做了以下事情:
我从使用spark shell切换到spark submit,然后更改kryoserializer的配置。这是我的代码:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.fpm.FPGrowth
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ListBuffer
object fpgrowth {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark FPGrowth")
.registerKryoClasses(
Array(classOf[ArrayBuffer[String]], classOf[ListBuffer[String]])
)
val sc = new SparkContext(conf)
val data = sc.textFile("<path to file.txt>")
val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))
val fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10)
val model = fpg.run(transactions)
model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}
}
}