CrimsonREwind's picture
Upload 28 files
86112bd verified
raw
history blame
6.5 kB
from flask import Flask, render_template, request, redirect, url_for, session, flash
import pandas as pd
import joblib
from fuzzywuzzy import process
from flask_bcrypt import Bcrypt
from functools import wraps
import os
from supabase import create_client, Client
from dotenv import load_dotenv
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY")
bcrypt = Bcrypt(app)
SUPABASE_URL = os.getenv("URL")
SUPABASE_KEY = os.getenv("KEY")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
movies = pd.read_csv('core/data/processed_movies_with_posters.csv')
cosine_sim = joblib.load('core/model/cosine_sim.pkl')
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'logged_in' not in session:
flash('Please log in to access this page.', 'warning')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
response = supabase.table('users').select('*').eq('username', username).execute()
if response.data:
user = response.data[0]
if bcrypt.check_password_hash(user['password'], password):
session['logged_in'] = True
session['username'] = username
return redirect(url_for('home'))
flash('Invalid username or password.', 'warning')
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = bcrypt.generate_password_hash(request.form['password']).decode('utf-8')
try:
response = supabase.table('users').insert({"username": username, "password": password}).execute()
if not response.data:
flash('Username already exists. Please choose a different one.', 'warning')
else:
flash('Registration successful! You can now log in.', 'success')
return redirect(url_for('login'))
except Exception as e:
if "duplicate key value violates unique constraint" in str(e):
flash(f"Username already exits", 'warning')
else:
flash(f"An error occurred: {str(e)}", 'warning')
return render_template('register.html')
@app.route('/logout')
@login_required
def logout():
session.clear()
flash('You have been logged out.', 'info')
return redirect(url_for('login'))
def get_recommendations(title, cosine_sim=cosine_sim):
title = title.lower()
if title not in movies['title'].str.lower().values:
close_matches = process.extract(title, movies['title'].str.lower().values, limit=5)
return None, [movies[movies['title'].str.lower() == match[0]].iloc[0] for match in close_matches]
idx = movies[movies['title'].str.lower() == title].index[0]
sim_scores = list(enumerate(cosine_sim[idx]))
sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
sim_scores = sim_scores[1:11]
movie_indices = [i[0] for i in sim_scores]
return movies.iloc[movie_indices], None
def get_recommendations_by_id(movie_id, cosine_sim=cosine_sim):
idx = movies[movies['id'] == movie_id].index[0]
sim_scores = list(enumerate(cosine_sim[idx]))
sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
sim_scores = sim_scores[1:6]
movie_indices = [i[0] for i in sim_scores]
return movies.iloc[movie_indices]
@app.route('/')
@login_required
def home():
return render_template('recommendation.html', movies=movies.sample(20).to_dict(orient='records'))
@app.route('/movie/<int:id>')
@login_required
def movie_details(id):
movie = movies[movies['id'] == id].iloc[0]
recommendations = get_recommendations_by_id(id).to_dict(orient='records')
return render_template('movie_details.html', movie=movie, recommendations=recommendations)
@app.route('/recommend', methods=['POST'])
@login_required
def recommend():
title = request.form['title']
recommendations, close_matches = get_recommendations(title)
if recommendations is None:
flash("Movie title not found. Did you mean one of these?", 'warning')
return render_template('recommendation.html', movies=[match.to_dict() for match in close_matches])
return render_template('recommendation.html', movies=recommendations.to_dict(orient='records'))
@app.route('/search', methods=['GET', 'POST'])
@login_required
def search():
if request.method == 'POST':
query = request.form['query']
results = movies[movies['title'].str.contains(query, case=False, na=False)]
return render_template('search.html', movies=results.to_dict(orient='records'))
return render_template('search.html', movies=None)
@app.route('/filter', methods=['GET', 'POST'])
@login_required
def filter():
genres = sorted(movies['genre'].str.split(',', expand=True).stack().dropna().unique())
languages = sorted(movies['original_language'].dropna().unique())
if request.method == 'POST':
selected_genre = request.form.get('genre')
selected_language = request.form.get('language')
if not selected_genre and not selected_language:
return render_template('filter.html', movies=None, genres=genres, languages=languages,
error_message="No movies found. Please adjust your filters.")
filtered_movies = movies.copy()
if selected_genre:
filtered_movies = filtered_movies[filtered_movies['genre'].str.contains(selected_genre, na=False)]
if selected_language:
filtered_movies = filtered_movies[filtered_movies['original_language'] == selected_language]
filtered_movies['genre'] = filtered_movies['genre'].fillna('').astype(str)
sample_size = min(50, len(filtered_movies))
filtered_movies = filtered_movies.sample(sample_size).to_dict(orient='records')
return render_template('filter.html', movies=filtered_movies, genres=genres, languages=languages,
error_message=None)
return render_template('filter.html', movies=None, genres=genres, languages=languages, error_message=None)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)