我正在尝试缓存产品功能和用户功能,以延长Spark MatrixFactorizationModel
的预测时间。
我做了什么:
训练模型ALS。
保存模型。
加载模型并缓存用户功能和产品功能。
代码段:
spark_config = SparkConf().setAll([('spark.executor.memory', '8g'), ('spark.cores.max', '4')])`
sc = SparkContext(conf=spark_config)
self.als_recommender = MatrixFactorizationModel.load(sc,Path)
self.als_recommender.userFeatures().cache()
self.als_recommender.productFeatures().cache()
并且我得到了相同的警告,并且预测很慢:-
WARN MatrixFactorizationModelWrapper: User factor is not cached. Prediction could be slow.
WARN MatrixFactorizationModelWrapper: Product factor is not cached. Prediction could be slow.
最佳答案
这似乎是您前面的问题The prediction time of spark matrix factorization的后续。
这是窍门,您需要对缓存的功能部件执行操作才能使其执行预测(在scala中):
als_recommender.productFeatures().cache()
als_recommender.productFeatures().count()
als_recommender.userFeatures().cache()
als_recommender.userFeatures().count()
这样,您将强制Spark将数据加载到缓存中。
但是即使那样,您也无法获得更快的预测...(
我建议您阅读以下文章,以了解您要实现的主要挑战:
Time/Space Complexity challenge in Recommendation System ?
Recommendation System to integrate with an Android app。
编辑:上面的代码不适用于pyspark,因为该模型实际上在后台使用JavaRDD。
loaded_model = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
loaded_model.userFeatures().cache()
print("user features count : {}".format(loaded_model.userFeatures().count()))
print("user features cached : {}".format(loaded_model.userFeatures().is_cached))
# user features count : 4
# user features cached : False
loaded_model.productFeatures().cache()
print("product features count : {}".format(loaded_model.productFeatures().count()))
print("product features cached : {}".format(loaded_model.productFeatures().is_cached))
# product features count : 4
# product features cached : False
为了解决这个问题,我们需要缓存底层的
JavaRDD
:loaded_model._java_model.userFeatures().persist(sc._jvm.org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK())
loaded_model_storage_lvl = loaded_model._java_model.userFeatures().getStorageLevel()
print("use disk : {}".format(loaded_model_storage_lvl.useDisk()))
print("use memory : {}".format(loaded_model_storage_lvl.useMemory()))
print("use off heap : {}".format(loaded_model_storage_lvl.useOffHeap()))
print("deserialized : {}".format(loaded_model_storage_lvl.deserialized()))
print("replication : {}".format(loaded_model_storage_lvl.replication()))
# use disk : True
# use memory : True # ==> It is persisted indeed in memory and disk (line above)
# use off heap : False
# deserialized : True
# replication : 1
PS:对于spark source code)。
关于python - 在PySpark中缓存用户和产品的潜在功能,以延长预测时间,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45836691/