[python] 추천시스템 - CF - MF(Matrix Factorization, 행렬 분해)

2021. 5. 21. 09:41Python/문법

Ver. Jupyter Notebook (Anaconda3)

 

▶ 추천시스템 - CF - MF(Matrix Factorization, 행렬 분해)

 ● 잠재 요인 협업 필터링

   - 사용자, 아이템 평점 행렬 속에 숨어있는 잠재 요인을 추출하여 예측 할 수 있는 기법

   - SVD와 비슷한 개념이지만 다름 (SVD는 추천시스템에 사용하지 못함)

   - KNN 방식보다 정확함

   - 넷플릭스에서 사용함

   - 요약: 가진 데이터로 없는 데이터를 유추

 

 

# 임의의 3, 4 행렬

R = np.array([[4, 2, np.NaN, 2,],
              [np.NaN, 5, 4, np.NaN,],
              [1, np.NaN, 3, 4,]])

 

# K = 3으로 학습

num_users, num_items = R.shape

K=3  # 잠재 요인은 3개

print(num_users) # M (행)
print(num_items) # N (열)

 

# P, Q에 K만큼의 행렬 생성 후 정규분포 랜덤값 부여

np.random.seed(1)

P = np.random.normal(scale=1./K, size=(num_users, K))  # 3X3 P행렬
Q = np.random.normal(scale=1./K, size=(num_items, K))  # 4X3 Q행렬

print(P,'\n')
print(Q)

 

# 원데이터 R과 예측 행렬 간 오차를 구하는 함수, R 행렬에 비어있지 않는 값: non_zeros

from sklearn.metrics import mean_squared_error

def get_rmse(R, P, Q, non_zeros):
    error = 0
    # 두개의 분해된 행렬 P와 Q.T의 내적으로 예측 R 행렬 생성
    full_pred_matrix = np.dot(P, Q.T)
    
    # 실제 R 행렬에서 널이 아닌 값의 위치 인덱스 추출하여 실제 R 행렬과 예측 행렬의 RMSE 추출
    x_non_zero_ind = [non_zero[0] for non_zero in non_zeros]
    y_non_zero_ind = [non_zero[1] for non_zero in non_zeros]
    R_non_zeros = R[x_non_zero_ind, y_non_zero_ind]
    full_pred_matrix_non_zeros = full_pred_matrix[x_non_zero_ind, y_non_zero_ind]
      
    mse = mean_squared_error(R_non_zeros, full_pred_matrix_non_zeros)
    rmse = np.sqrt(mse)
    
    return rmse

 

# R > 0 인 행 위치, 열 위치, 값을 non_zeros 리스트에 저장

non_zeros = [ (i, j, R[i,j]) for i in range(num_users) for j in range(num_items) if R[i,j] > 0 ]
non_zeros

 

# P, Q 매트릭스를 계속 업데이트(경사하강법)

steps=10000
learning_rate=0.01
r_lambda=0.01

for step in range(steps):  # 1000회 업데이트
    for i, j, r in non_zeros:
        
        # 실제 값과 예측 값의 차이인 오류 값 구함
        eij = r - np.dot(P[i, :], Q[j, :].T)
        
        # Regularization을 반영한 SGD(확률적 경사하강법) 업데이트 공식 적용
        P[i,:] = P[i,:] + learning_rate * ( eij * Q[j,:] - r_lambda*P[i,:] )
        Q[j,:] = Q[j,:] + learning_rate * ( eij * P[i,:] - r_lambda*Q[j,:] )

    rmse = get_rmse(R, P, Q, non_zeros)
    if (step % 50) == 0 :
        print("### iteration step : ", step," rmse : ", rmse)

 

# 예측

pred_matrix = np.dot(P, Q.T)
print('예측 행렬:\n', np.round(pred_matrix, 3))