forked from alfredolainez/bpr-spark
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdistbpr.py
125 lines (94 loc) · 3.65 KB
/
distbpr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import random
import numpy as np
from tqdm import tqdm
def _optimize_partition(user_ratings, prod_mat, nb_prods, l2_reg=0.001,
alpha=0.1, negative_samples=30, num_samples=20000):
# yank everything out of the iterator
user_ratings = [_ for _ in user_ratings]
ratings = (
(u_id, prod)
for (u_id, (products, _)) in user_ratings
for prod in products
)
user_vectors = {u: v for (u, (_, v)) in user_ratings}
pos_repeated = (
a
for b in ([x] * negative_samples for x in ratings)
for a in b
)
neg_ratings = [
(x[0], x[1], np.random.randint(nb_prods) + 1)
for x in pos_repeated
]
shuff_ratings = neg_ratings
random.shuffle(shuff_ratings)
for u_id, pos_id, neg_id in tqdm(shuff_ratings[:num_samples]):
u_vector = user_vectors.get(u_id)
x_uij = u_vector.dot(prod_mat[pos_id]) - u_vector.dot(prod_mat[neg_id])
scale = np.exp(-x_uij) / (1 + np.exp(-x_uij))
prod_mat[pos_id] += alpha * (
(scale * u_vector) + l2_reg * prod_mat[pos_id])
prod_mat[neg_id] += alpha * (
(scale * -1 * u_vector) + l2_reg * prod_mat[neg_id])
user_vectors[u_id] = u_vector + alpha * (
scale * (prod_mat[pos_id] - prod_mat[neg_id]) + l2_reg * u_vector)
yield (prod_mat, user_vectors.items())
def optimizeMF(ratings, rank=10, nb_iter=10, nb_partitions=4,
num_samples=100000, l2_reg=0.001, alpha=0.1,
negative_samples=30):
"""optimize BPR for Matrix Factorization
Args:
-----
ratings: RDD of (user, item) implicit interactions
rank: latent factor dimension
nb_iter: how many iterations of SGD
nb_partitions: how many user partitions to distribute
num_samples: |D_s| from the paper
negative_samples: how many negative samples per positive example
l2_reg: regularization parameter
alpha: learning rate
Returns:
--------
(userMat, itemMat)
"""
nb_prods = ratings.map(lambda (_, i): i).max()
ratings_by_user = ratings.groupByKey().persist()
def make_vec((u_id, products)):
return (u_id, (products, np.random.uniform(size=rank)))
user_ratings = ratings_by_user.map(make_vec).persist()
ratings_partitioned = user_ratings.partitionBy(nb_partitions).persist()
prod_mat = np.random.uniform(size=(nb_prods + 1, rank))
for _ in xrange(nb_iter):
result = ratings_partitioned.mapPartitions(
# sample and apply the gradient
lambda ratings: _optimize_partition(
user_ratings=ratings,
prod_mat=prod_mat,
nb_prods=nb_prods,
num_samples=num_samples,
l2_reg=l2_reg,
alpha=alpha,
negative_samples=negative_samples
)
).persist() # cache for later
prod_mat = result.map(
# get the product matrices
lambda x: x[0]
).reduce(
# reduce as matrix addition
lambda x, y: x + y
) / result.count()
user_vecs_rdd = result.map(lambda x: x[1]).flatMap(lambda x: x)
ratings_partitioned = ratings_by_user.join(user_vecs_rdd)
# release this
result.unpersist()
# Only for evaluation purposes
nb_users = ratings.map(lambda x: x[0]).max()
user_mat = np.random.uniform(size=(nb_users + 1, rank))
user_vectors = map(
lambda (u_id, (products, vector)): (u_id, vector),
ratings_partitioned.toLocalIterator()
)
for u, v in user_vectors:
user_mat[u] = v
return (user_mat, prod_mat)