해당 글은 교재 'Python을 이용한 개인화 추천시스템' 실습 과정을 정리한 글로, 이전 글과 이어집니다! 자세한 내용이 궁금하시면 교재와 글을 참고해주세요!
https://jhklee-coder.tistory.com/75
MF (matrix factorization) 기반 추천 1 [ReSys]
추천을 위한 알고리즘은 크게 메모리 기반(memory-based)과 모델 기반(model-based)으로 나눌 수 있다. 메모리 기반 알고리즘은 추천을 위한 데이터를 모두 메모리에 갖고 있으면서 추천이 필요할 때마
jhklee-coder.tistory.com
🔍 SGD MF 기본 알고리즘
# 데이터 불러오기
import numpy as np
import pandas as pd
r_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings = pd.read_csv('C:/.../u 1.data', names=r_cols, sep='\t',encoding='latin-1')
ratings = ratings[['user_id', 'movie_id', 'rating']].astype(int)
# MF class (행렬요인화 클래스)
class MF():
def __init__(self, ratings, K, alpha, beta, iterations, verbose=True):
# 생성자.
self.R = np.array(ratings)
self.num_users, self.num_items = np.shape(self.R)
self.K = K # 잠재요인
self.alpha = alpha # 학습률
self.beta = beta # 정규화 계수
self.iterations = iterations # SGD 반복횟수
self.verbose = verbose # 중간 학습과정의 출력 여부
def rmse(self):
# 오차 제곱 계산 함수.
xs, ys = self.R.nonzero()
self.predictions = []
self.errors = []
for x, y in zip(xs, ys):
prediction = self.get_prediction(x, y)
self.predictions.append(prediction)
self.errors.append(self.R[x, y] - prediction)
self.predictions = np.array(self.predictions)
self.errors = np.array(self.errors)
return np.sqrt(np.mean(self.errors**2))
def train(self):
# 정해진 반복 횟수만큼 (2), (4) (-> 이전 글 참고!) 식을 사용해 P, Q, bu, bd를 업데이트하는 함수.
self.P = np.random.normal(scale=1./self.K, size=(self.num_users, self.K))
self.Q = np.random.normal(scale=1./self.K, size=(self.num_items, self.K))
# Initializing the bias terms
self.b_u = np.zeros(self.num_users)
self.b_d = np.zeros(self.num_items)
self.b = np.mean(self.R[self.R.nonzero()])
# List of training samples
rows, columns = self.R.nonzero()
self.samples = [(i, j, self.R[i,j]) for i, j in zip(rows, columns)]
# Stochastic gradient descent for given number of iterations
training_process = []
for i in range(self.iterations):
np.random.shuffle(self.samples)
self.sgd()
rmse = self.rmse()
training_process.append((i+1, rmse))
if self.verbose:
if (i+1) % 10 == 0:
print("Iteration: %d ; Train RMSE = %.4f " % (i+1, rmse))
return training_process
def get_prediction(self, i, j):
# 평점 예측값 구하는 함수.
prediction = self.b + self.b_u[i] + self.b_d[j] + self.P[i, :].dot(self.Q[j, :].T)
return prediction
def sgd(self):
# sgd를 실행하는 함수.
for i, j, r in self.samples:
prediction = self.get_prediction(i, j)
e = (r - prediction)
self.b_u[i] += self.alpha * (e - self.beta * self.b_u[i])
self.b_d[j] += self.alpha * (e - self.beta * self.b_d[j])
self.P[i, :] += self.alpha * (e * self.Q[j, :] - self.beta * self.P[i,:])
self.Q[j, :] += self.alpha * (e * self.P[i, :] - self.beta * self.Q[j,:])
# 전체 데이터 사용 MF
R_temp = ratings.pivot(index='user_id', columns='movie_id', values='rating').fillna(0)
mf = MF(R_temp, K=30, alpha=0.001, beta=0.02, iterations=100, verbose=True)
train_process = mf.train()
최종 결과 : RMSE = 0.8825. (단, 동일 데이터를 train 함과 동시에 test했기 때문에 각각의 데이터셋을 분리시켜줄 필요가 있다.
🔍 Train/Test 셋이 분리된 MF 알고리즘
import numpy as np
import pandas as pd
r_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings = pd.read_csv('C:/... /u1.data', names=r_cols, sep='\t',encoding='latin-1')
ratings = ratings[['user_id', 'movie_id', 'rating']].astype(int) # timestamp 제거
from sklearn.utils import shuffle
TRAIN_SIZE = 0.75
ratings = shuffle(ratings, random_state=1)
cutoff = int(TRAIN_SIZE * len(ratings))
ratings_train = ratings.iloc[:cutoff]
ratings_test = ratings.iloc[cutoff:]
class NEW_MF():
# 새로운 class 정의
def __init__(self, ratings, K, alpha, beta, iterations, verbose =True):
self.R = np.array(ratings)
item_id_index = [] # 아이템 인덱스 매핑
index_item_id = []
for i, one_id in enumerate(ratings):
item_id_index.append([one_id. i])
index_item_id.append([i, one_id])
self.item_id_index = dict(item_id_index)
self.index_item_id = dict(index_item_id)
user_id_index = [] # 유저 id 인덱스 매핑
index_user_id = []
for i, one_Id in enumerate(ratings.T):
user_id_index.append([one_id, i])
index_user_id.append([i, one_id])
self.user_id_index = dict(user_id_index)
self.index_user_id = dict(index_user_id)
def set_test(self, ratings_test):
# test set 선정
test_set = []
for i in range(len(ratings_test)):
x = self.user_id_index[ratings_test.iloc[i, 0]]
y = self.item_id_index[ratings_test.iloc[i, 1]]
z = ratings_test.iloc[i, 2]
test_set.append([x, y, z])
self.R[x, y] = 0
self.test_set = test_set
return test_set
def test_rmse(self):
# rmse 계산
error = 0
for one_set in self.test_set:
predicted = self.get_prediction(one_set[0], one_set[1])
error += pow(one_set[2] - predicted, 2)
return np.sqrt(error/len(self.test_set))
def test(self):
# train과 동시에 test set 정확도 계산
self.p = np.random.normal(scale = 1./self.K, size = (self.num_users, self.K))
self.Q = np.random.normal(scale = 1./self.K, size = (self.num_items, self.K))
self.b_u = np.zeros(self.num_users)
self.b_d = np.zeros(self.num_items)
rows, columns = self.R.nonzero()
self.samples = [(i, j, self.R[i, j]) for i, j in zip(rows, columns)]
training_process = []
for i in range(self.iterations):
np.random.shuffle(self.samples)
self.sgd()
rmse1 = self.rmse()
rmse2 = self.test_rmse()
training_process.append((i+1, rmse1, rmse2))
if self.verbose:
if (i + 1) % 10 == 0:
print("Iteration: %d; Train RMSE = %.4f ; Test RMSE = %.4f" % (i+1, rmse1, rmse2))
return training_process
def get_one_prediction(self, user_id, item_id):
return self.get_one_prediction(self.user_id_index[user+id], self.item_id_index[item_id])
def full_prediction(self):
return self.b + self.b_u[:, np.newaxis] + self.b_d[np.newaxis, :] + self.p.dot(self.Q.T)
R_temp = ratings.pivot(index = 'user_id', columns = 'movie_id', values = 'rating').fillna(0)
mf = NEW_MF(R_temp, K = 30, alpha = 0.001, beta = 0.02, iterations = 100, verbose = True)
test_set = mf.set_test(rating_test)
result = mf.test()
print(mf.full_prediction())
print(mf.get_one_prediction(1, 2))
실행 결과 : 유저 아이디가 1인 유저의 경우, 아이템 2에 대해 3.376... 정도의 평점을 매길 것으로 예측이 가능하다.
🔍 MF 의 최적의 파라미터는?? (매개변수 값)
# 최적의 K값 찾기
results = []
index = []
for K in range(50, 261, 10):
print('K =', K)
R_temp = ratings.pivot(index='user_id', columns='movie_id', values='rating').fillna(0)
mf = NEW_MF(R_temp, K=K, alpha=0.001, beta=0.02, iterations=300, verbose=True)
test_set = mf.set_test(ratings_test)
result = mf.test()
index.append(K)
results.append(result)
# 최적의 iterations 값 찾기
summary = []
for i in range(len(results)):
RMSE = []
for result in results[i]:
RMSE.append(result[2])
min = np.min(RMSE)
j = RMSE.index(min)
summary.append([index[i], j+1, RMSE[j]])
# 그래프 그리기
import matplotlib.pyplot as plt
plt.plot(index, [x[2] for x in summary])
plt.ylim(0.89, 0.94)
plt.xlabel('K')
plt.ylabel('RMSE')
plt.show()
실행 결과 : K = 200, iterations = 250 정도에서 최적의 RMSE 값이 도출!
'기획(PM & PO) > ✏️ 개발 일지' 카테고리의 다른 글
'MF' 와 '딥러닝' [ReSys] (0) | 2024.04.14 |
---|---|
'FM (Factorization Machines)' 기반 추천 2 [ReSys] (0) | 2024.04.01 |
'FM (Factorization Machines)' 기반 추천 1 [ReSys] (1) | 2024.03.29 |
'MF (matrix factorization)' 기반 추천 1 [ReSys] (0) | 2024.03.24 |