1
+ # https://udemy.com/recommender-systems
2
+ # https://deeplearningcourses.com/recommender-systems
3
+
4
+ # notes:
5
+ # you may have trouble with full dataset on just your local machine
6
+ # if you want to know what's in an RDD, use .take(n), ex:
7
+ # tmp = p.take(5)
8
+ # print(tmp)
9
+
10
+ from pyspark .mllib .recommendation import ALS , MatrixFactorizationModel , Rating
11
+ from pyspark import SparkContext
12
+
13
+ # increase memory
14
+ # SparkContext.setSystemProperty('spark.driver.memory', '10g')
15
+ # SparkContext.setSystemProperty('spark.executor.memory', '10g')
16
+
17
+ sc = SparkContext ("local" , "Your App Name Here" )
18
+
19
+
20
+ # load in the data
21
+ # data = sc.textFile("../large_files/movielens-20m-dataset/small_rating.csv")
22
+ data = sc .textFile ("../large_files/movielens-20m-dataset/rating.csv.gz" )
23
+
24
+ # filter out header
25
+ header = data .first () #extract header
26
+ data = data .filter (lambda row : row != header )
27
+
28
+ # convert into a sequence of Rating objects
29
+ ratings = data .map (
30
+ lambda l : l .split (',' )
31
+ ).map (
32
+ lambda l : Rating (int (l [0 ]), int (l [1 ]), float (l [2 ]))
33
+ )
34
+
35
+ # split into train and test
36
+ train , test = ratings .randomSplit ([0.8 , 0.2 ])
37
+
38
+ # train the model
39
+ K = 10
40
+ epochs = 10
41
+ model = ALS .train (train , K , epochs )
42
+
43
+ # evaluate the model
44
+
45
+ # train
46
+ x = train .map (lambda p : (p [0 ], p [1 ]))
47
+ p = model .predictAll (x ).map (lambda r : ((r [0 ], r [1 ]), r [2 ]))
48
+ ratesAndPreds = train .map (lambda r : ((r [0 ], r [1 ]), r [2 ])).join (p )
49
+ # joins on first item: (user_id, movie_id)
50
+ # each row of result is: ((user_id, movie_id), (rating, prediction))
51
+ mse = ratesAndPreds .map (lambda r : (r [1 ][0 ] - r [1 ][1 ])** 2 ).mean ()
52
+ print ("***** train mse: %s *****" % mse )
53
+
54
+
55
+ # test
56
+ x = test .map (lambda p : (p [0 ], p [1 ]))
57
+ p = model .predictAll (x ).map (lambda r : ((r [0 ], r [1 ]), r [2 ]))
58
+ ratesAndPreds = test .map (lambda r : ((r [0 ], r [1 ]), r [2 ])).join (p )
59
+ mse = ratesAndPreds .map (lambda r : (r [1 ][0 ] - r [1 ][1 ])** 2 ).mean ()
60
+ print ("***** test mse: %s *****" % mse )
0 commit comments