Movie Recommendation based on Alternating Least Squares (ALS) with Apache Spark


The underlying assumption about collaborative recommendation is that if a user A has the same opinion as a user B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a user chosen randomly. With Collaborative filtering we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating).

The image below (from Wikipedia) shows an example of collaborative filtering. At first, people rate different items (like videos, images, games). Then, the system makes predictions about a user's rating for an item not rated yet. The new predictions are built upon the existing ratings of other users with similar ratings with the active user. In the image, the system predicts that the user will not like the video.

Download Movielens Dataset

complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'
import os

datasets_path = os.path.join('.', 'input')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')
import urllib.request

small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

Loading and parsing datasets

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommend').getOrCreate()
data = spark.read.csv('input/ml-latest/ratings.csv', inferSchema=True, header=True)
data.printSchema()
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)
#Count null value
from pyspark.sql.functions import col,sum
data.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in data.columns)).show()
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+
#Count Null value
from pyspark.sql.functions import lit, col

rows = data.count()
summary = data.describe().filter(col("summary") == "count")
summary.select(*((lit(rows)-col(c)).alias(c) for c in data.columns)).show()
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|   0.0|    0.0|   0.0|      0.0|
+------+-------+------+---------+
print('No. of row: %d' % data.count())
data.show(5)
No. of row: 100004
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows
# count, mean, std, min & max
data.describe().show()
+-------+------------------+------------------+------------------+--------------------+
|summary|            userId|           movieId|            rating|           timestamp|
+-------+------------------+------------------+------------------+--------------------+
|  count|            100004|            100004|            100004|              100004|
|   mean| 347.0113095476181|12548.664363425463| 3.543608255669773|1.1296390869392424E9|
| stddev|195.16383797819535|26369.198968815268|1.0580641091070326|1.9168582602710962E8|
|    min|                 1|                 1|               0.5|           789652009|
|    max|               671|            163949|               5.0|          1476640644|
+-------+------------------+------------------+------------------+--------------------+
#Split dataset to train and test
train_data, test_data = data.randomSplit([0.8, 0.2])

We can do a split to evaluate how well our model performed, but keep in mind that it is very hard to know conclusively how well a recommender system is truly working for some topics. Especially if subjectivity is involved, for example not everyone that loves star wars is going to love star trek, even though a recommendation system may suggest otherwise.

Alternating Least Squares (ALS)

Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares. The implementation in MLlib has these parameters:

  • numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
  • rank is the number of latent factors in the model.
  • iterations is the number of iterations to run.
  • lambda specifies the regularization parameter in ALS.
  • implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
  • alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

Let's see this all in action!

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=10, regParam=0.1, rank=8, nonnegative=True, coldStartStrategy="drop",\
          userCol='userId', itemCol='movieId', ratingCol='rating')
model = als.fit(train_data)
print('Factorized user matrix with rank = %d' % model.rank)
model.userFactors.show(5)

print('-'*50)

print('Factorized item matrix with rank = %d' % model.rank)
model.itemFactors.show(5)
Factorized user matrix with rank = 10
+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.18798763, -0....|
| 20|[-0.002044285, -0...|
| 30|[-0.19509034, -0....|
| 40|[-0.55244017, -0....|
| 50|[-0.3752768, -0.4...|
+---+--------------------+
only showing top 5 rows

--------------------------------------------------
Factorized item matrix with rank = 10
+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.1918898, -0.6...|
| 20|[-0.38074443, 0.4...|
| 30|[-0.84450626, -0....|
| 40|[-0.30144945, -0....|
| 50|[-0.664701, -0.73...|
+---+--------------------+
only showing top 5 rows
print('Recommended top users (e.g. 1 top user) for all items with the corresponding predicted ratings:')
model.recommendForAllItems(1).show(5)

print('-'*50)

print('Recommended top items (e.g. 1 top item) for all users with the corresponding predicted ratings:')
model.recommendForAllUsers(1).show(5)
Recommended top users (e.g. 1 top user) for all items with the corresponding predicted ratings:
+-------+----------------+
|movieId| recommendations|
+-------+----------------+
|   1580|[[46,5.1127443]]|
|   5300|[[477,5.093527]]|
|   6620| [[71,4.573213]]|
|   7340|[[656,4.489139]]|
|  32460| [[46,5.134292]]|
+-------+----------------+
only showing top 5 rows

--------------------------------------------------
Recommended top items (e.g. 1 top item) for all users with the corresponding predicted ratings:
+------+-------------------+
|userId|    recommendations|
+------+-------------------+
|   471|  [[3653,5.175575]]|
|   463|[[83411,4.8556952]]|
|   496| [[59684,5.243225]]|
|   148| [[83411,5.363898]]|
|   540|  [[8530,5.641106]]|
+------+-------------------+
only showing top 5 rows

Make predictions on test_data

#Let see how the model perform
predictions = model.transform(test_data)
predictions.show()
+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|128291|    148|   3.0| 945456963| 2.6328552|
|194356|    148|   3.5|1195002533| 2.0075881|
|123330|    148|   4.0| 939330967|  3.164426|
|140405|    148|   2.0|1491812543|  2.504307|
|103660|    148|   4.0| 830077276| 3.2916343|
|265662|    148|   3.0| 832686930| 2.6706219|
|128560|    148|   3.0| 833165538| 3.5371344|
|249913|    148|   2.0| 915406133| 2.0586205|
|255659|    148|   5.0| 829576446| 3.6189497|
|219731|    148|   4.0| 833661877| 2.7767935|
| 84661|    148|   3.0| 837059383| 2.6634388|
|219886|    148|   3.0| 915554312| 3.1332464|
| 70913|    148|   5.0| 832703670| 3.3639038|
| 73307|    148|   3.0| 857133054| 3.1618505|
| 49761|    148|   1.0| 842886116|  2.196462|
|117395|    148|   2.0| 945907892| 2.9700744|
|264631|    148|   3.0| 970170090|  2.694469|
|221257|    148|   2.0| 844627287| 1.9263531|
|124567|    148|   3.0| 838905641|  2.998578|
|224850|    148|   3.0|1058985403|  3.010354|
+------+-------+------+----------+----------+
only showing top 20 rows
predictions.printSchema()
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- prediction: float (nullable = true)

Evaluate the predictions

Evaluate the model by computing the RMSE on the test data

# check the root mean squared error
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')
rmse = evaluator.evaluate(predictions)
print('Root mean squared error of the test_data: %.4f' % rmse)
Root mean squared error of the test_data: 0.8274

The RMSE described our error in terms of the stars rating column. So now that we have the model, how would you actually supply a recommendation to a user?

The same way we did with the test data! For example:

# see historical rating of the user
user_history = train_data.filter(train_data['userId']==11)
user_history.show()
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|    11|     50|   5.0|1391658537|
|    11|     70|   1.0|1391656827|
|    11|    169|   3.0|1391657297|
|    11|    296|   5.0|1391658423|
|    11|    785|   3.5|1391656845|
|    11|    923|   5.0|1391658556|
|    11|   1027|   4.5|1391658634|
|    11|   1201|   5.0|1391658440|
|    11|   1408|   5.0|1391658667|
|    11|   2042|   3.5|1391657376|
|    11|   2596|   4.5|1391657543|
|    11|   2762|   3.0|1391658583|
|    11|   3424|   3.0|1391657112|
|    11|   5669|   3.0|1391658601|
|    11|   6598|   5.0|1391657861|
|    11|  26614|   5.0|1391658574|
|    11|  48516|   5.0|1391658450|
|    11|  51084|   4.0|1391657605|
|    11|  58295|   4.5|1391657459|
|    11|  71211|   3.5|1391657720|
+------+-------+------+----------+
only showing top 20 rows
# a list of movies we are thinking to offer
user_suggest = test_data.filter(train_data['userId']==11).select(['movieId', 'userId'])
user_suggest.show()
+-------+------+
|movieId|userId|
+-------+------+
|    126|    11|
|    778|    11|
|   1918|    11|
|  81158|    11|
|  81562|    11|
+-------+------+
# offer movies with a high predicted rating
user_offer = model.transform(user_suggest)
user_offer.orderBy('prediction', ascending=False).show()
+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|    778|    11|  4.746132|
|    126|    11|   4.20014|
|  81562|    11|  3.883953|
|   1918|    11| 2.9844015|
|  81158|    11| 2.9751697|
+-------+------+----------+