API 개발/API 개발 Project

API Project : (2) 영화 추천 API 개발 (대용량 파일 Docker 서버 배포)

신강희 2024. 5. 29. 16:09
728x90

< (2) 영화 추천 API 개발 

(대용량 파일 Docker 서버 배포) >

 

# 데이터 가공은 확인하였고, 이제 Postman에서 API 설계 및 리퀘스트 생성

 

# 이제 작성한 Colab 문을 활용해서 VSC에서 코드 개발 (각각 API 기능에 맞게 resources 폴더 안에 movie.py / recommend.py / user.py 파일로 구분하여 개발)

 

- movie.py 파일

 
from flask import request
from flask_jwt_extended import get_jwt_identity, jwt_required
from flask_restful import Resource

from mysql_connection import get_connection
from mysql.connector import Error

class MovieListResource(Resource) :
    # 영화 전체 리스트 가져오는 API
    @jwt_required()
    def get(self) :

        offset = request.args.get('offset')
        limit = request.args.get('limit')
        order = request.args.get('order')

        user_id = get_jwt_identity()

        try :
           
            connection = get_connection()

            query = '''select m.id, m.title ,
                            count(r.id) as reviewCnt,
                            ifnull(avg(r.rating),0) as avgRating,
                            if( f.id is null ,0 , 1) as isFavorite
                        from movie m
                        left join review r
                            on m.id = r.movieId
                        left join favorite f
                            on m.id = f.movieId and f.userId = %s
                        group by m.id
                        order by '''+order+''' desc
                        limit '''+offset+''','''+limit+''';'''
            record = (user_id , )
           
            cursor = connection.cursor(dictionary=True)

            cursor.execute( query , record )

            result_list = cursor.fetchall()

            cursor.close()
            connection.close()


        except Error as e :
            if cursor is not None:
                cursor.close()
            if connection is not None:
                connection.close()
            return {'result' : 'fail',
                    'errer' : str(e)}, 500

        print(result_list)

        i = 0
        for row in result_list :
            result_list[i]['avgRating'] = float(row['avgRating'])
            i = i + 1

        print(result_list)

        return {'result' : 'sucess',
                'items' : result_list,
                'count' : len(result_list)}
   

 

- recommend.py 파일

 
from flask import request
from flask_jwt_extended import get_jwt_identity, jwt_required
from flask_restful import Resource

from mysql_connection import get_connection
from mysql.connector import Error

import pandas as pd
import numpy as np

class MovieRecommendResuorce(Resource) :
    # 영화 추천하는 API
    @jwt_required()
    def get(self) :

        count = request.args.get('count')
        # 쿼리 파라미터로 넘어온 데이터(count)는 문자열이므로
        # 숫자로 사용하려면 캐스팅 한다.
        count = int(count)

        user_id = get_jwt_identity()

        # 영화 리뷰 데이터가 필요하다.
        try :
           
            connection = get_connection()

            query = '''select m.title, r.userId, r.rating
                        from movie m
                        left join review r
                            on m.id = r.movieId;;'''
           
            cursor = connection.cursor(dictionary=True)

            cursor.execute( query )

            result_list = cursor.fetchall()

            df = pd.DataFrame(data=result_list)

            print(df)

            df = df.pivot_table(index='userId', columns='title', values='rating')

            print(df)

            movie_corr = df.corr(min_periods= 50)

            query = '''select m.title, r.rating
                        from review r
                        join movie m
                            on r.movieId = m.id
                        where userId = %s;'''
            record = ( user_id, )
            cursor = connection.cursor(dictionary=True)
            cursor.execute(query, record)
            result_list = cursor.fetchall()

            cursor.close()
            connection.close()

        except Error as e :
            if cursor is not None:
                cursor.close()
            if connection is not None:
                connection.close()
            return {'result' : 'fail',
                    'errer' : str(e)}, 500
       
        # 내 별점정보를 기반으로, 추천영화 목록을 만든다.
        my_review = pd.DataFrame(data = result_list)

        movie_list = pd.DataFrame()

        for i in np.arange(my_review.shape[0]) :
            title = my_review['title'][i]
            recom_movie = movie_corr[title].dropna().sort_values(ascending=False).to_frame()
            recom_movie.columns = ['correlation']
            recom_movie['weight'] = recom_movie['correlation'] * my_review['rating'][i]
            movie_list = pd.concat([movie_list, recom_movie])

        # 중복추천된 영화, 내가 이미 본 영화는 제거한다.
        # 내가 이미 본 영화는 추천에서 제외한다.
        for title in my_review['title'] :
            if title in movie_list.index :
                movie_list.drop(title, axis = 0, inplace=True)

        movie_list = movie_list.groupby('title')['weight'].max().sort_values(ascending=False)

        if movie_list.shape[0]  < count :
            return{'result' : 'fail', 'error' : '추천영화 갯수가 count 보다 적습니다.'}, 400
       
        # 판다스의 데이터프레임을, JSON으로 바꾼다.
        movie_list = movie_list.to_frame()
        movie_list = movie_list.reset_index()
        movie_list = movie_list.to_dict('records')
       
        return {'result' : 'success',
                'itmes' : movie_list,
                'count' : len(movie_list)}
 

 

- user.py 파일

 
import datetime
from email_validator import EmailNotValidError, validate_email

from flask import request
from flask_jwt_extended import create_access_token, jwt_required, get_jwt
from mysql.connector import Error
from flask_restful import Resource

from mysql_connection import get_connection
from utils import check_password, hash_password

# 회원가입 API
class UserRegisterResource(Resource) :
    def post(self) :
        # 1. 클라이언트가 보낸 데이터를 받아준다.
        # 포스트맨으로 작성한 body 데이터를 json으로 받아온다.
        data = request.get_json()
        print(data)
        # 2. 데이터가 모두 있는지 확인 (두가지 방법)
        # 1) if 문으로 나누어서 작성 (이게 좀더 직관적으로 구분되므로 해당 방법 추천)
        #if 'email' not in data or 'username' not in data or 'password' not in data :
        #    return {"result" : "fail"}, 400
       
        #if data['email'].strip() == '' or data['username'].strip() == '' or data['password'].strip() == '' :
        #    return {"result" : "fail"}, 400
        # 2) if 문 길게 한줄로 작성
        if data.get('email') is None or data.get('email').strip() == '' or \
            data.get('nickname') is None or data.get('nickname').strip() == '' or \
            data.get('password') is None or data.get('password').strip() == ''or \
            data.get('gender') is None:
            return {'result' : 'fail'}, 400

        # 3. 이메일주소 형식이 올바른지 확인
        # 지정된 형식인 EmailNotValidError 사용 (정의된 라이브러리이므로 해당 형식 그대로 사용해야함)
        try :
            validate_email(data['email'])
        except EmailNotValidError as e :
            return {'result' : 'fail', 'error' : str(e)}, 400

        # 4. 비밀번호 길이가 유효한지 체크한다.
        #    예) 비번은 4자리 이상 12 자리 이하
        if len(data["password"]) < 4 or len(data["password"]) > 12 :
            return {'result' : 'fail'}, 400

        # 5. 비밀번호를 암호화 한다.
        password = hash_password( data['password'])
        print(password)

        # 6. DB에 저장한다.
        try :
            connection = get_connection()
            # 이번 쿼리문 작성 주의할점! record에 password는 암호화된 패스워드를 가져와야 하기 때문에 상단에서 암호화한 변수로 지정해줘야함!
            query = '''insert into user
                (nickname, email, password, gender)
                values
                (%s, %s, %s, %s);'''
            record = (data['nickname'], data['email'], password, data['gender'])
            cursor = connection.cursor()
            cursor.execute(query, record)
            connection.commit()

            ### 중요하다!!!
            ### DB에 회원가입하여, user 테이블에 insert 된 후,
            ### 이 user 테이블의 id 값을 가져와야 한다.
            ### 생성한 변수를 마지막 리턴에 작성하여 클라이언트에게 보내준다.
            user_id = cursor.lastrowid

            cursor.close()
            connection.close()

        except Error as e:
            if cursor is not None:
                cursor.close()
            if connection is not None:
                connection.close()
            return {'result' : 'fail'}, 500
       
        # 6-2. user_id를 바로 클라이언트에게 보내면 안되고,
        ##     JWT 로 암호화 해서, 인증토큰을 보내야 한다.
        ### 토큰 만료시킬때 사용
        # access_token = create_access_token(user_id,
        #                                    expires_delta= datetime.timedelta(minutes=3))
        access_token = create_access_token(user_id, )
           
        # 7. 응답할 데이터를 JSON으로 만들어서 리턴!

        return {"result" : "success", 'access_token' : access_token}
   
        # 이렇게 생성한 user_id는 결국 생성한 레시피를 DB에 저장할때 구분하기 위해서 만든것이므로 생성한 user_id 컬럼을 레시피 파일에 메뉴 생성 API 함수에다가 추가해줘야한다.
        # 이제 수정이든, 삭제든 user_id가 필요해졌는데 이게 보안되지 않으면 제3의 인물이 user_id를 동일하게 해서 생성하거나 삭제할수도 있다. 이게 해킹
        # 그러므로 인증토큰(jwt)이 필요하다! jwt 라이브러리 설치! pip install flask-jwt-extended
        # 하고 결과에 access_token으로 생성한 변수를 추가해 준다.

# 로그인 API
class UserLoginResource(Resource) :
    def post(self) :

        # 1. 클라이언트로부터 데이터를 받는다.
        data = request.get_json()

        if 'email' not in data or 'password' not in data:
            return {'result' : 'fail'}, 400
        if data['email'].strip() == '' or data['password'].strip() == '':
            return {'result' : 'fail'}, 400

        # 2. DB로부터 이메일에 해당하는 유저 정보를 가져온다.
        try :
            connection = get_connection()
            query = '''select *
                from user
                where email = %s;'''
            record = (data['email'] , )
            cursor = connection.cursor(dictionary=True)
            cursor.execute(query, record)

            result_list = cursor.fetchall()

            print(result_list)

            cursor.close()
            connection.close()

        except Error as e:
            if cursor is not None:
                cursor.close()
            if connection is not None:
                connection.close()
            return {'result':'fail', 'error':str(e)},500

        # 3. 회원인지 확인한다.
        if result_list == [] :
            return {'result' : 'fail'} , 401

        # 4. 비밀번호를 체크한다.
        # 유저가 입력한 비번 data['password']
        # DB에 암호화된 비번 result_list[0]['password']
        isCorrect = check_password(data['password'] , result_list[0]['password'])
        if isCorrect == False :
            return {'result' : 'fail'} , 401

        # 5. 유저아이디를 가져온다.
        user_id = result_list[0]['id']

        # 6. JWT 토큰을 만든다.
        # access_token = create_access_token(user_id,
        #                                    expires_delta= datetime.timedelta(minutes=3))
        access_token = create_access_token(user_id,)

        # 7. 클라이언트에 응답한다.

        return {'result' : 'success', 'access_token':access_token}

# 로그아웃 API => 다른 user API들에비해 복잡도가 있다!
# 로그아웃된 토큰을 저장할, set을 만든다.
jwt_blacklist = set()

class UserLogoutResource(Resource) :

    @jwt_required()
    def delete(self) :

        # 메뉴얼대로 작성하는것!
        jti = get_jwt()['jti']
        jwt_blacklist.add(jti)
        return
 

 

# app.py 파일에 API 경로 설정 까지 작성

 
import serverless_wsgi

from flask import Flask
from flask_restful import Api
from flask_jwt_extended import JWTManager

from config import Config
from resources.movie import MovieListResource
from resources.recommend import MovieRecommendResuorce
from resources.user import UserLoginResource, UserLogoutResource, UserRegisterResource, jwt_blacklist

app = Flask(__name__)

app.config.from_object(Config)

# JWT 매니저 초기화
jwt = JWTManager(app)

# 로그아웃된 토큰으로 요청하는 경우, 처리하는 함수 작성
@jwt.token_in_blocklist_loader
def check_if_token_is_revoked(jwt_header, jwt_payload) :
    jti = jwt_payload['jti']
    return jti in jwt_blacklist

api = Api(app)

# 경로와 리소스를 연결하는 코드 작성
api.add_resource( UserRegisterResource , '/user/register')
api.add_resource( UserLoginResource , '/user/login')
api.add_resource( UserLogoutResource , '/user/logout')

api.add_resource( MovieListResource, '/movie')
api.add_resource( MovieRecommendResuorce,'/movie/recommend')

def handler(event, context) :
    return serverless_wsgi.handle_request(app, event, context)

if __name__ == '__main__':
    app.run()
 

 

# config, mysql_connection, utils 그리고 .gitignore 파일은 이전 프로젝트 파일을 그대로 복사하여 사용

- 이대로 sls deploy를 실행하면 용량 문제로 서버가 정상적으로 배포되지 않는다. (람다는 배포 서버에 라이브러리 설치가 가능한 용량이 50MB)

- 이때 Docker를 사용하여 서버에 배포

- 참고 : https://sorktjrrb.tistory.com/154

 

Docker : (1) 설치 방법 및 AWS ECR 설정

# API 서버 자동 배포 시 용량 문제로 서버가 돌아가지 않을때 Docker를 사용- 람다로 서버 배포시 라이브러리의 용량이 50MB까지밖에 제공되지 않음 # 도커 설치 필요 & 도커란?? - Docker는 소프트웨

sorktjrrb.tistory.com

 

# Dockerfile 과 .dockerignore 파일을 생성하고, serverless.yml, requirements.txt 수정하여 작성

- 파일을 생성할때 오타나 대소문자가 틀리면 잘인식되지 않는다! 파일이 인식되는지 아이콘이 변화되었는지로 확인하고 작성해야함

 

- Dockerfile 파일

 
# Docker 메뉴얼대로 작성하는것!
FROM public.ecr.aws/lambda/python:3.10
COPY . ${LAMBDA_TASK_ROOT}
COPY requirements.txt .

RUN yum -y install gcc
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"

CMD ["app.handler"]
 

 

- .dockerignore 파일

 
__pycache__/
.git/
.serverless/
.gitignore
.dockerignore
serverless.yml
 

 

- serverless.yml 파일 (용량을 줄이기 위해 파이썬등 몇가지는 도커로 옮기고 여기선 삭제)

 
service: aws-movie-server

frameworkVersion: '3'

custom:
  wsgi:
    app: app.app

provider:
  name: aws
  region : ap-northeast-2
  ecr:
    images:
      appimage:
        path: ./

functions:
  api:
    image:
      name: appimage
    timeout: 30
    events:
      - httpApi: '*'
 

 

- requirements.txt 파일 (Colab 구문을 사용하였기 때문에, numpy와 pandas도 꼭 적어주어야함)

 
Flask==1.1.4
Werkzeug==1.0.1
markupsafe==2.0.1

serverless-wsgi

flask-restful
mysql-connector-python
psycopg-binary
passlib
flask-jwt-extended
email-validator

numpy
pandas
 

 

# 이상태로 sls deploy를 다시 실행하면 서버 배포를 확인할수 있다.

- 이후에 Git Actions를 사용해서 자동화까지 설정!

- 참고 : https://sorktjrrb.tistory.com/156

 

Docker : (3) CI/CD(서버 자동 배포)를 위한 Git Actions 설정

# 이제 로컬 환경에선 테스트가 끝났고 서버에 자동 배포를 진행해 보자- 방식은 이전에 Lambda 자동배포와 거의 동일함 Access Key 생성도 필요- 참고 : https://sorktjrrb.tistory.com/151 # 이전장까지 하여

sorktjrrb.tistory.com

 

다음 게시글로 계속~!

 

반응형