|
from flask import Flask, render_template, request, redirect, url_for, session, flash |
|
import pandas as pd |
|
import joblib |
|
from fuzzywuzzy import process |
|
from flask_bcrypt import Bcrypt |
|
from functools import wraps |
|
import os |
|
from supabase import create_client, Client |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
app = Flask(__name__) |
|
app.secret_key = os.getenv("SECRET_KEY") |
|
bcrypt = Bcrypt(app) |
|
|
|
SUPABASE_URL = os.getenv("URL") |
|
SUPABASE_KEY = os.getenv("KEY") |
|
|
|
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) |
|
|
|
movies = pd.read_csv('core/data/processed_movies_with_posters.csv') |
|
cosine_sim = joblib.load('core/model/cosine_sim.pkl') |
|
|
|
def login_required(f): |
|
@wraps(f) |
|
def decorated_function(*args, **kwargs): |
|
if 'logged_in' not in session: |
|
flash('Please log in to access this page.', 'warning') |
|
return redirect(url_for('login')) |
|
return f(*args, **kwargs) |
|
return decorated_function |
|
|
|
@app.route('/login', methods=['GET', 'POST']) |
|
def login(): |
|
if request.method == 'POST': |
|
username = request.form['username'] |
|
password = request.form['password'] |
|
|
|
response = supabase.table('users').select('*').eq('username', username).execute() |
|
if response.data: |
|
user = response.data[0] |
|
if bcrypt.check_password_hash(user['password'], password): |
|
session['logged_in'] = True |
|
session['username'] = username |
|
return redirect(url_for('home')) |
|
|
|
flash('Invalid username or password.', 'warning') |
|
|
|
return render_template('login.html') |
|
|
|
@app.route('/register', methods=['GET', 'POST']) |
|
def register(): |
|
if request.method == 'POST': |
|
username = request.form['username'] |
|
password = bcrypt.generate_password_hash(request.form['password']).decode('utf-8') |
|
|
|
try: |
|
response = supabase.table('users').insert({"username": username, "password": password}).execute() |
|
if not response.data: |
|
flash('Username already exists. Please choose a different one.', 'warning') |
|
else: |
|
flash('Registration successful! You can now log in.', 'success') |
|
return redirect(url_for('login')) |
|
except Exception as e: |
|
if "duplicate key value violates unique constraint" in str(e): |
|
flash(f"Username already exits", 'warning') |
|
else: |
|
flash(f"An error occurred: {str(e)}", 'warning') |
|
|
|
return render_template('register.html') |
|
|
|
@app.route('/logout') |
|
@login_required |
|
def logout(): |
|
session.clear() |
|
flash('You have been logged out.', 'info') |
|
return redirect(url_for('login')) |
|
|
|
|
|
def get_recommendations(title, cosine_sim=cosine_sim): |
|
title = title.lower() |
|
if title not in movies['title'].str.lower().values: |
|
close_matches = process.extract(title, movies['title'].str.lower().values, limit=5) |
|
return None, [movies[movies['title'].str.lower() == match[0]].iloc[0] for match in close_matches] |
|
idx = movies[movies['title'].str.lower() == title].index[0] |
|
sim_scores = list(enumerate(cosine_sim[idx])) |
|
sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True) |
|
sim_scores = sim_scores[1:11] |
|
movie_indices = [i[0] for i in sim_scores] |
|
return movies.iloc[movie_indices], None |
|
|
|
def get_recommendations_by_id(movie_id, cosine_sim=cosine_sim): |
|
idx = movies[movies['id'] == movie_id].index[0] |
|
sim_scores = list(enumerate(cosine_sim[idx])) |
|
sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True) |
|
sim_scores = sim_scores[1:6] |
|
movie_indices = [i[0] for i in sim_scores] |
|
return movies.iloc[movie_indices] |
|
|
|
@app.route('/') |
|
@login_required |
|
def home(): |
|
return render_template('recommendation.html', movies=movies.sample(20).to_dict(orient='records')) |
|
|
|
@app.route('/movie/<int:id>') |
|
@login_required |
|
def movie_details(id): |
|
movie = movies[movies['id'] == id].iloc[0] |
|
recommendations = get_recommendations_by_id(id).to_dict(orient='records') |
|
return render_template('movie_details.html', movie=movie, recommendations=recommendations) |
|
|
|
@app.route('/recommend', methods=['POST']) |
|
@login_required |
|
def recommend(): |
|
title = request.form['title'] |
|
recommendations, close_matches = get_recommendations(title) |
|
if recommendations is None: |
|
flash("Movie title not found. Did you mean one of these?", 'warning') |
|
return render_template('recommendation.html', movies=[match.to_dict() for match in close_matches]) |
|
return render_template('recommendation.html', movies=recommendations.to_dict(orient='records')) |
|
|
|
@app.route('/search', methods=['GET', 'POST']) |
|
@login_required |
|
def search(): |
|
if request.method == 'POST': |
|
query = request.form['query'] |
|
results = movies[movies['title'].str.contains(query, case=False, na=False)] |
|
return render_template('search.html', movies=results.to_dict(orient='records')) |
|
return render_template('search.html', movies=None) |
|
|
|
@app.route('/filter', methods=['GET', 'POST']) |
|
@login_required |
|
def filter(): |
|
genres = sorted(movies['genre'].str.split(',', expand=True).stack().dropna().unique()) |
|
languages = sorted(movies['original_language'].dropna().unique()) |
|
|
|
if request.method == 'POST': |
|
selected_genre = request.form.get('genre') |
|
selected_language = request.form.get('language') |
|
|
|
if not selected_genre and not selected_language: |
|
return render_template('filter.html', movies=None, genres=genres, languages=languages, |
|
error_message="No movies found. Please adjust your filters.") |
|
|
|
filtered_movies = movies.copy() |
|
|
|
if selected_genre: |
|
filtered_movies = filtered_movies[filtered_movies['genre'].str.contains(selected_genre, na=False)] |
|
|
|
if selected_language: |
|
filtered_movies = filtered_movies[filtered_movies['original_language'] == selected_language] |
|
|
|
filtered_movies['genre'] = filtered_movies['genre'].fillna('').astype(str) |
|
|
|
sample_size = min(50, len(filtered_movies)) |
|
|
|
filtered_movies = filtered_movies.sample(sample_size).to_dict(orient='records') |
|
|
|
return render_template('filter.html', movies=filtered_movies, genres=genres, languages=languages, |
|
error_message=None) |
|
|
|
return render_template('filter.html', movies=None, genres=genres, languages=languages, error_message=None) |
|
|
|
if __name__ == '__main__': |
|
app.run(host='0.0.0.0', port=7860) |