-
[Recommender System] - Python으로 Matrix Factorization 구현하기Recommender System/추천 시스템 2018. 8. 1. 01:43
Matrix Factorization 혹은 Matrix Complementation과 관련된 수많은 변형 알고리즘들이 존재한다. 하지만 본질적으로는 행렬을 분해하고 분해한 행렬을 변수로써 학습하는 것이다. 업데이트 방법에 따라 NNMF, ALS 등으로 나뉘고, cost를 정의하는 방식에 따라 Implicit, explicit data를 처리하는 다른 알고리즘이 된다. 그래서 본질을 제대로 이해하고 있다면, 최근 Kaggle 등에서 유행하고 있는 여러 Matrix Factorization의 최신 알고리즘도 크게 다르지 않다는 것을 알 수 있다.
본 포스팅에서는 Python 코드를 이용하여 banila-MF를 구현할 것이다. (Matrix Factorization에 대한 이론적 설명 참고)
1. Basic Concept
MF의 기본 아이디어는 위와 같은 Matrix의 물음표 부분들을 행렬 분해로 채워나간다는 것이다.
먼저 Input User-Item Rating Matrix를 R이라 정의하고, 이를 User Latent Vector P, Item Latent Vector Q로 분해할 수 있다. 분해된 행렬을 다시 곱하면 원래의 모양 R이 만들어지고, 물음표는 추론된 값으로 채워지게 된다. 여기서 P와 Q의 요소들이 학습해야하는 변수라고 할 수 있다. 결과적으로 추론된 값은 predicted rating 이라고 할 수 있고, 수식은 아래와 같다. 우리의 목적은 predicted rating을 뱉어내도록 하는 p_ik, q_jk를 학습하는 것이다.
2. 수식들의 정의
개념을 확립했다면, 이를 코드로 옮기기 위한 수식들을 정의해야 한다. 우선적으로 정의해야 하는 것은 cost이다. cost는 간단히 rating과 predicted rating간의 RMSE를 이용하면 된다. 그리고 이에 대한 p_ik, q_jk들의 gradient를 구하고, 정규화 파라미터를 일반적인 방법으로 cost에 추가해준 뒤 순차적인 gradient descent를 진행하는 코드를 짜면 된다. 지금 언급한 코드에 필요한 수식들은 아래와 같다.
3. 파이썬 구현
파이썬으로 구현한 코드는 다음과 같다. batch를 조정하지 않은 gradient descent를 사용하였고, 변수들은 Numpy를 이용하여 보정하지 않은 완전한 랜덤 변수로 초기화하였다. 정규화 파라미터는 위의 수식과 동일한 수식을 사용하였고 latent 보정을 위해 maxrix global bias를 사용하였다. 코드는 바로 실행 가능하도록 작성되었다.
import numpy as np class MatrixFactorization(): def __init__(self, R, k, learning_rate, reg_param, epochs, verbose=False): """ :param R: rating matrix :param k: latent parameter :param learning_rate: alpha on weight update :param reg_param: beta on weight update :param epochs: training epochs :param verbose: print status """ self._R = R self._num_users, self._num_items = R.shape self._k = k self._learning_rate = learning_rate self._reg_param = reg_param self._epochs = epochs self._verbose = verbose def fit(self): """ training Matrix Factorization : Update matrix latent weight and bias 참고: self._b에 대한 설명 - global bias: input R에서 평가가 매겨진 rating의 평균값을 global bias로 사용 - 정규화 기능. 최종 rating에 음수가 들어가는 것 대신 latent feature에 음수가 포함되도록 해줌. :return: training_process """ # init latent features self._P = np.random.normal(size=(self._num_users, self._k)) self._Q = np.random.normal(size=(self._num_items, self._k)) # init biases self._b_P = np.zeros(self._num_users) self._b_Q = np.zeros(self._num_items) self._b = np.mean(self._R[np.where(self._R != 0)]) # train while epochs self._training_process = [] for epoch in range(self._epochs): # rating이 존재하는 index를 기준으로 training for i in range(self._num_users): for j in range(self._num_items): if self._R[i, j] > 0: self.gradient_descent(i, j, self._R[i, j]) cost = self.cost() self._training_process.append((epoch, cost)) # print status if self._verbose == True and ((epoch + 1) % 10 == 0): print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost)) def cost(self): """ compute root mean square error :return: rmse cost """ # xi, yi: R[xi, yi]는 nonzero인 value를 의미한다. # 참고: http://codepractice.tistory.com/90 xi, yi = self._R.nonzero() predicted = self.get_complete_matrix() cost = 0 for x, y in zip(xi, yi): cost += pow(self._R[x, y] - predicted[x, y], 2) return np.sqrt(cost) / len(xi) def gradient(self, error, i, j): """ gradient of latent feature for GD :param error: rating - prediction error :param i: user index :param j: item index :return: gradient of latent feature tuple """ dp = (error * self._Q[j, :]) - (self._reg_param * self._P[i, :]) dq = (error * self._P[i, :]) - (self._reg_param * self._Q[j, :]) return dp, dq def gradient_descent(self, i, j, rating): """ graident descent function :param i: user index of matrix :param j: item index of matrix :param rating: rating of (i,j) """ # get error prediction = self.get_prediction(i, j) error = rating - prediction # update biases self._b_P[i] += self._learning_rate * (error - self._reg_param * self._b_P[i]) self._b_Q[j] += self._learning_rate * (error - self._reg_param * self._b_Q[j]) # update latent feature dp, dq = self.gradient(error, i, j) self._P[i, :] += self._learning_rate * dp self._Q[j, :] += self._learning_rate * dq def get_prediction(self, i, j): """ get predicted rating: user_i, item_j :return: prediction of r_ij """ return self._b + self._b_P[i] + self._b_Q[j] + self._P[i, :].dot(self._Q[j, :].T) def get_complete_matrix(self): """ computer complete matrix PXQ + P.bias + Q.bias + global bias - PXQ 행렬에 b_P[:, np.newaxis]를 더하는 것은 각 열마다 bias를 더해주는 것 - b_Q[np.newaxis:, ]를 더하는 것은 각 행마다 bias를 더해주는 것 - b를 더하는 것은 각 element마다 bias를 더해주는 것 - newaxis: 차원을 추가해줌. 1차원인 Latent들로 2차원의 R에 행/열 단위 연산을 해주기위해 차원을 추가하는 것. :return: complete matrix R^ """ return self._b + self._b_P[:, np.newaxis] + self._b_Q[np.newaxis:, ] + self._P.dot(self._Q.T) def print_results(self): """ print fit results """ print("User Latent P:") print(self._P) print("Item Latent Q:") print(self._Q.T) print("P x Q:") print(self._P.dot(self._Q.T)) print("bias:") print(self._b) print("User Latent bias:") print(self._b_P) print("Item Latent bias:") print(self._b_Q) print("Final R matrix:") print(self.get_complete_matrix()) print("Final RMSE:") print(self._training_process[self._epochs-1][1]) # run example if __name__ == "__main__": # rating matrix - User X Item : (7 X 5) R = np.array([ [1, 0, 0, 1, 3], [2, 0, 3, 1, 1], [1, 2, 0, 5, 0], [1, 0, 0, 4, 4], [2, 1, 5, 4, 0], [5, 1, 5, 4, 0], [0, 0, 0, 1, 0], ]) # P, Q is (7 X k), (k X 5) matrix factorizer = MatrixFactorization(R, k=3, learning_rate=0.01, reg_param=0.01, epochs=300, verbose=True) factorizer.fit() factorizer.print_results()
'Recommender System > 추천 시스템' 카테고리의 다른 글
[Recommender System] - 개인화 추천 시스템의 최신 동향 (7) 2019.06.17 [Recommender System] - Autoencoder를 이용한 차원 축소 기법 (2) 2019.05.20 [Recommender System] - MF(Matrix Factorization) 모델과 ALS(Alternating Least Squares) (2) 2018.07.16 [Recommender System] - Spark로 연관 규칙(Association Rule) 구현하기 (2) 2018.06.25 [Recommender System] - 추천 시스템에 사용되는 알고리즘들 (12) 2018.05.12 댓글