我正在使用这里提供的MovieLens数据集为电影提供指令系统:
http://grouplens.org/datasets/movielens/
为了计算此命令系统,我在scala中使用Flink的ML库,尤其是ALS算法(org.apache.flink.ml.recommendation.ALS
)。
我首先将电影的评分映射到DataSet[(Int, Int, Double)]
,然后创建trainingSet
和testSet
(请参见下面的代码)。
我的问题是,当我在整个数据集(所有评级)中使用ALS.fit
函数时没有错误,但是如果我只删除一个评级,则fit函数将不再起作用,而我不会不明白为什么。
你有什么想法? :)
使用的代码:
Rating.scala
case class Rating(userId: Int, movieId: Int, rating: Double)
预处理标量
object PreProcessing {
def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
env.readCsvFile[(Int, Int, Double)](
ratingsPath, ignoreFirstLine = true,
includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}
处理标量
object Processing {
private val ratingsPath: String = "Path_to_ratings.csv"
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first(ratings.count().toInt)
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
}
}
“但是如果我只删除一个评级”
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first((ratings.count()-1).toInt)
错误 :
2015年6月19日15:00:24 CoGroup(CoGroup at org.apache.flink.ml.recommendation.ALS $ .updateFactors(ALS.scala:570))(4/4)切换为FAILED
java.lang.ArrayIndexOutOfBoundsException:5
在org.apache.flink.ml.recommendation.ALS $ BlockRating.apply(ALS.scala:358)
在org.apache.flink.ml.recommendation.ALS $$ anon $ 111.coGroup(ALS.scala:635)
在org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)
...
最佳答案
问题是first
运算符与Flink的setTemporaryPath
实现的ALS
参数结合使用。为了理解问题,让我快速解释一下阻塞ALS算法是如何工作的。
交替最小二乘的阻塞实现首先将给定的评分矩阵按用户和逐项划分为块。对于这些块,将计算路由信息。该路由信息说出哪个用户/项目块分别从哪个项目/用户块接收哪个输入。之后,开始ALS迭代。
由于Flink的基础执行引擎是并行流数据流引擎,因此它试图以流水线方式执行尽可能多的数据流部分。这要求管道的所有操作员都同时在线。这样做的好处是Flink避免实现中间结果,中间结果可能过大。缺点是可用内存必须在所有运行的操作员之间共享。在单个DataSet
元素(例如用户/项目块)的大小相当大的ALS情况下,这是不希望的。
为了解决此问题,如果您设置了temporaryPath
,则并非所有实现的操作符都会同时执行。该路径定义了可以存储中间结果的位置。因此,如果您定义了临时路径,则ALS
首先计算用户块的路由信息并将其写入磁盘,然后它计算项目块的路由信息并将其写入磁盘,最后但并非最不重要的是它开始ALS迭代,并从临时路径中读取路由信息。
用户和物料块的工艺路线信息的计算均取决于给定的评级数据集。在您的情况下,当您计算用户路由信息时,它将首先读取评级数据集并在其上应用first
运算符。 first
运算符从基础数据集中返回n
任意元素。现在的问题是Flink不存储此first
操作的结果来计算项目路由信息。相反,当您开始计算项目路由信息时,Flink将从其源开始重新执行数据流。这意味着它将从磁盘读取收视率数据集,并在其上再次应用first
运算符。与第一个first
操作的结果相比,在许多情况下,这将为您提供不同的评级。因此,生成的路由信息不一致并且ALS
失败。
您可以通过具体化first
运算符的结果并将此结果用作ALS
算法的输入来规避该问题。对象FlinkMLTools
包含方法persist
,该方法采用DataSet
,将其写入给定的路径,然后返回一个新的DataSet
,该DataSet
读取刚刚编写的temporaryPath
。这使您可以分解结果数据流图。
val firstTrainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.first((ratings.count()-1).toInt)
val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS/")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
或者,您可以尝试保持
first
不变。然后以流水线方式执行所有步骤(路由信息计算和als迭代)。这意味着用户和项目路由信息计算都使用由first
运算符得出的相同输入数据集。Flink社区当前正在努力将操作员的中间结果保留在内存中。这将允许固定运算符的结果,这样就不会对其进行两次计算,因此,由于其不确定性,因此不会给出不同的结果。