- #!/usr/bin/env python
- # coding=utf-8
- '''
- 运行命令/yourpath/spark/bin/spark-submit --driver-memory 1g MovieLensALS.py movieLensDataDir personalRatingsFile
- movieLensDataDir 电影评分数据集目录 比如 ml-1m/
- personalRatingsFile 需要推荐的某用户的评价数据 格式参考ratings.dat
- '''
- import sys
- import itertools
- from math import sqrt
- from operator import add
- from os.path import join, isfile, dirname
- from pyspark import SparkConf, SparkContext
- from pyspark.mllib.recommendation import ALS
- def parseRating(line):
- """
- Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
- """
- fields = line.strip().split("::")
- return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))
- def parseMovie(line):
- """
- Parses a movie record in MovieLens format movieId::movieTitle .
- """
- fields = line.strip().split("::")
- return int(fields[0]), fields[1]
- def loadRatings(ratingsFile):
- """
- Load ratings from file.
- """
- if not isfile(ratingsFile):
- print "File %s does not exist." % ratingsFile
- sys.exit(1)
- f = open(ratingsFile, 'r')
- ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
- f.close()
- if not ratings:
- print "No ratings provided."
- sys.exit(1)
- else:
- return ratings
- def computeRmse(model, data, n):
- """
- Compute RMSE (Root Mean Squared Error).
- """
- predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
- predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])).join(data.map(lambda x: ((x[0], x[1]), x[2]))).values()
- return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
- if __name__ == "__main__":
- if (len(sys.argv) != 3):
- print "Usage: /path/to/spark/bin/spark-submit --driver-memory 1g MovieLensALS.py movieLensDataDir personalRatingsFile"
- sys.exit(1)
- # set up environment
- conf = SparkConf().setAppName("MovieLensALS").set("spark.executor.memory", "1g")
- sc = SparkContext(conf=conf)
- # load personal ratings
- myRatings = loadRatings(sys.argv[2])
- myRatingsRDD = sc.parallelize(myRatings, 1)
- movieLensHomeDir = sys.argv[1]
- # ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
- ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)
- # movies is an RDD of (movieId, movieTitle)
- movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())
- numRatings = ratings.count()
- numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
- numMovies = ratings.values().map(lambda r: r[1]).distinct().count()
- myRatedMovieIds = set([x[1] for x in myRatings])
- print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)
- # split ratings into train , validation
- # last digit of the timestamp, add myRatings to train, and cache them
- # training, validation, test are all RDDs of (userId, movieId, rating)
- numPartitions = 4
- #training = ratings.filter(lambda x: x[0] 8).values().union(myRatingsRDD).repartition(numPartitions).cache()
- validation = ratings.filter(lambda x: x[0] >= 8 and x[0] 10).values().repartition(numPartitions).cache()
- numTraining = training.count()
- numValidation = validation.count()
- print "Training: %d, validation: %d" % (numTraining, numValidation)
- # train models and evaluate them on the validation set
- ranks = [10,12]
- lambdas = [0.01,0.4,1.0]
- numIters = [10]
- bestModel = None
- bestValidationRmse = float("inf")
- bestRank = 0
- bestLambda = -1.0
- bestNumIter = -1
- for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
- model = ALS.train(training, rank, numIter, lmbda)
- validationRmse = computeRmse(model, validation, numValidation)
- print "RMSE (validation) = %f for the model trained with " % validationRmse + "rank = %d, lambda = %.4f, and numIter = %d." % (rank, lmbda, numIter)
- if (validationRmse
- bestModel = model
- bestValidationRmse = validationRmse
- bestRank = rank
- bestLambda = lmbda
- bestNumIter = numIter
- # evaluate the best model on the test set
- print "The best model was trained with rank = %d and lambda = %.4f, and numIter = %d ,and Rmse %.4f" % (bestRank, bestLambda,bestNumIter,bestValidationRmse)
- #exit()
- #通过计算得到rank = 10 lambda = 0.45 numIter = 20 结果最好
- bestModel = ALS.train(training, 10, 20, 0.45);
- # training, validation, test are all RDDs of (userId, movieId, rating)
- #make personalized recommendations
- #排除该用户已评价过的电影
- testdata = training.filter(lambda x: x[0] not in myRatedMovieIds).map(lambda p: (int(p[0]), int(p[1])))
- predictions = bestModel.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
- #对预测结果按分值排序 取前5
- recommendations = predictions.sortBy(lambda x:x[1],False).take(5)
- print "Movies recommended for you:"
- for i in xrange(len(recommendations)):
- print ("%2d: %s %s" % (i + 1, recommendations[i][0],recommendations[i][1]))
- # clean up
- sc.stop()
代码参考https://github.com/databricks/spark-training/blob/master/machine-learning/python/solution/MovieLensALS.py
personalRatingsFile
0::1::?::1400000000::Toy Story (1995)
0::780::?::1400000000::Independence Day (a.k.a. ID4) (1996)
0::590::?::1400000000::Dances with Wolves (1990)
0::1210::?::1400000000::Star Wars: Episode VI - Return of the Jedi (1983)
0::648::?::1400000000::Mission: Impossible (1996)
0::344::?::1400000000::Ace Ventura: Pet Detective (1994)
0::165::?::1400000000::Die Hard: With a Vengeance (1995)
0::153::?::1400000000::Batman Forever (1995)
0::597::?::1400000000::Pretty Woman (1990)
0::1580::?::1400000000::Men in Black (1997)
0::231::?::1400000000::Dumb & Dumber (1994)