我正在尝试缓存产品功能和用户功能,以延长Spark MatrixFactorizationModel的预测时间。

我做了什么:


训练模型ALS。
保存模型。
加载模型并缓存用户功能和产品功能。


代码段:

spark_config = SparkConf().setAll([('spark.executor.memory', '8g'), ('spark.cores.max', '4')])`

sc = SparkContext(conf=spark_config)

self.als_recommender = MatrixFactorizationModel.load(sc,Path)
self.als_recommender.userFeatures().cache()
self.als_recommender.productFeatures().cache()


并且我得到了相同的警告,并且预测很慢:-

WARN MatrixFactorizationModelWrapper: User factor is not cached. Prediction could be slow.
WARN MatrixFactorizationModelWrapper: Product factor is not cached. Prediction could be slow.

最佳答案

这似乎是您前面的问题The prediction time of spark matrix factorization的后续。

这是窍门,您需要对缓存的功能部件执行操作才能使其执行预测(在scala中):

als_recommender.productFeatures().cache()
als_recommender.productFeatures().count()
als_recommender.userFeatures().cache()
als_recommender.userFeatures().count()


这样,您将强制Spark将数据加载到缓存中。

但是即使那样,您也无法获得更快的预测...(
我建议您阅读以下文章,以了解您要实现的主要挑战:


Time/Space Complexity challenge in Recommendation System ?
Recommendation System to integrate with an Android app


编辑:上面的代码不适用于pyspark,因为该模型实际上在后台使用JavaRDD。

loaded_model = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
loaded_model.userFeatures().cache()
print("user features count : {}".format(loaded_model.userFeatures().count()))
print("user features cached : {}".format(loaded_model.userFeatures().is_cached))
# user features count : 4
# user features cached : False

loaded_model.productFeatures().cache()
print("product features count : {}".format(loaded_model.productFeatures().count()))
print("product features cached : {}".format(loaded_model.productFeatures().is_cached))
# product features count : 4
# product features cached : False


为了解决这个问题,我们需要缓存底层的JavaRDD

loaded_model._java_model.userFeatures().persist(sc._jvm.org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK())

loaded_model_storage_lvl = loaded_model._java_model.userFeatures().getStorageLevel()
print("use disk : {}".format(loaded_model_storage_lvl.useDisk()))
print("use memory : {}".format(loaded_model_storage_lvl.useMemory()))
print("use off heap : {}".format(loaded_model_storage_lvl.useOffHeap()))
print("deserialized : {}".format(loaded_model_storage_lvl.deserialized()))
print("replication  : {}".format(loaded_model_storage_lvl.replication()))
# use disk : True
# use memory : True # ==> It is persisted indeed in memory and disk (line above)
# use off heap : False
# deserialized : True
# replication  : 1


PS:对于spark source code)。

关于python - 在PySpark中缓存用户和产品的潜在功能,以延长预测时间,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45836691/

10-10 23:39
查看更多