|
from flask import Flask, render_template, request, redirect, url_for, flash |
|
from flask_login import ( |
|
LoginManager, |
|
login_user, |
|
login_required, |
|
logout_user, |
|
current_user, |
|
UserMixin |
|
) |
|
from werkzeug.security import generate_password_hash, check_password_hash |
|
from datetime import datetime |
|
from pymongo import MongoClient |
|
from recommend import find_top_common_books |
|
from urllib.parse import quote_plus, unquote_plus |
|
from bson.objectid import ObjectId |
|
import requests |
|
import logging |
|
import os |
|
|
|
app = Flask(__name__, template_folder='../templates', static_folder='../static') |
|
app.secret_key = os.environ.get('SECRET_KEY', 'your_default_secret_key') |
|
|
|
|
|
login_manager = LoginManager() |
|
login_manager.init_app(app) |
|
login_manager.login_view = 'login' |
|
|
|
|
|
GOOGLE_BOOKS_API_KEY = 'AIzaSyBqTms_1DSR3xcxCapdmizZiZUMaswaI9M' |
|
|
|
|
|
client = MongoClient("mongodb+srv://Atharva:[email protected]/book_dataset?retryWrites=true&w=majority") |
|
db = client['book_dataset'] |
|
books_collection = db['BOOK'] |
|
reviews_collection = db['reviews'] |
|
users_collection = db['users'] |
|
|
|
|
|
book_details_cache = {} |
|
|
|
def fetch_book_details(title, api_key=None): |
|
"""Fetch book details from Google Books API based on the book title.""" |
|
if title in book_details_cache: |
|
return book_details_cache[title] |
|
|
|
base_url = "https://www.googleapis.com/books/v1/volumes/" |
|
params = { |
|
'q': title, |
|
'maxResults': 1 |
|
} |
|
if api_key: |
|
params['key'] = api_key |
|
|
|
try: |
|
response = requests.get(base_url, params=params) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
if 'items' in data and len(data['items']) > 0: |
|
volume_info = data['items'][0].get('volumeInfo', {}) |
|
book_details = { |
|
'authors': volume_info.get('authors', ["Unknown Author"]), |
|
'description': volume_info.get('description', "No description available."), |
|
'averageRating': volume_info.get('averageRating', "N/A"), |
|
'ratingsCount': volume_info.get('ratingsCount', "N/A"), |
|
'publishedDate': volume_info.get('publishedDate', "N/A"), |
|
'pageCount': volume_info.get('pageCount', "N/A"), |
|
'language': volume_info.get('language', "N/A"), |
|
'publisher': volume_info.get('publisher', "N/A") |
|
} |
|
book_details_cache[title] = book_details |
|
return book_details |
|
else: |
|
book_details = { |
|
'authors': ["Unknown Author"], |
|
'description': "No description available.", |
|
'averageRating': "N/A", |
|
'ratingsCount': "N/A", |
|
'publishedDate': "N/A", |
|
'pageCount': "N/A", |
|
'language': "N/A", |
|
'publisher': "N/A" |
|
} |
|
book_details_cache[title] = book_details |
|
return book_details |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching details for '{title}': {e}") |
|
book_details = { |
|
'authors': ["Unknown Author"], |
|
'description': "No description available.", |
|
'averageRating': "N/A", |
|
'ratingsCount': "N/A", |
|
'publishedDate': "N/A", |
|
'pageCount': "N/A", |
|
'language': "N/A", |
|
'publisher': "N/A" |
|
} |
|
book_details_cache[title] = book_details |
|
return book_details |
|
|
|
|
|
class User(UserMixin): |
|
def __init__(self, user_data): |
|
self.id = str(user_data['_id']) |
|
self.username = user_data['username'] |
|
self.email = user_data['email'] |
|
|
|
@staticmethod |
|
def get(user_id): |
|
user_data = users_collection.find_one({'_id': ObjectId(user_id)}) |
|
if user_data: |
|
return User(user_data) |
|
return None |
|
|
|
@login_manager.user_loader |
|
def load_user(user_id): |
|
return User.get(user_id) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
logger.setLevel(logging.DEBUG) |
|
|
|
|
|
file_handler = logging.FileHandler('app.log') |
|
stream_handler = logging.StreamHandler() |
|
|
|
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
file_handler.setFormatter(formatter) |
|
stream_handler.setFormatter(formatter) |
|
|
|
|
|
logger.addHandler(file_handler) |
|
logger.addHandler(stream_handler) |
|
|
|
|
|
@app.route('/register', methods=['GET', 'POST']) |
|
def register(): |
|
if request.method == 'POST': |
|
|
|
username = request.form.get('username') |
|
email = request.form.get('email') |
|
password = request.form.get('password') |
|
confirm_password = request.form.get('confirm_password') |
|
|
|
|
|
if not all([username, email, password, confirm_password]): |
|
logger.warning('Please fill out all fields.') |
|
flash('Please fill out all fields.', 'danger') |
|
return redirect(url_for('register')) |
|
|
|
if password != confirm_password: |
|
logger.warning('Passwords do not match.') |
|
flash('Passwords do not match.', 'danger') |
|
return redirect(url_for('register')) |
|
|
|
|
|
existing_user = users_collection.find_one({'email': email}) |
|
if existing_user: |
|
logger.warning('Email already registered.') |
|
flash('Email already registered.', 'danger') |
|
return redirect(url_for('register')) |
|
|
|
|
|
password_hash = generate_password_hash(password) |
|
|
|
|
|
new_user = { |
|
'username': username, |
|
'email': email, |
|
'password': password_hash, |
|
'created_at': datetime.utcnow() |
|
} |
|
|
|
|
|
users_collection.insert_one(new_user) |
|
|
|
logger.info('Registration successful!') |
|
flash('Registration successful! Please log in.', 'success') |
|
return redirect(url_for('login')) |
|
|
|
return render_template('register.html') |
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST']) |
|
def login(): |
|
if request.method == 'POST': |
|
|
|
email = request.form.get('email') |
|
password = request.form.get('password') |
|
|
|
|
|
user_data = users_collection.find_one({'email': email}) |
|
if user_data and check_password_hash(user_data['password'], password): |
|
user = User(user_data) |
|
login_user(user) |
|
logger.info('Logged in successfully!') |
|
flash('Logged in successfully!', 'success') |
|
next_page = request.args.get('next') |
|
return redirect(next_page or url_for('homepage')) |
|
else: |
|
logger.warning('Invalid email or password.') |
|
flash('Invalid email or password.', 'danger') |
|
return redirect(url_for('login')) |
|
|
|
return render_template('login.html') |
|
|
|
|
|
@app.route('/logout') |
|
@login_required |
|
def logout(): |
|
logout_user() |
|
logger.info('You have been logged out.') |
|
flash('You have been logged out.', 'success') |
|
return redirect(url_for('homepage')) |
|
|
|
|
|
@app.route('/contact', methods=['GET', 'POST']) |
|
def contact(): |
|
if request.method == 'POST': |
|
|
|
name = request.form.get('name') |
|
email = request.form.get('email') |
|
review = request.form.get('review') |
|
rating = request.form.get('rating') |
|
|
|
|
|
if not all([name, email, review, rating]): |
|
logger.warning("All fields are required!") |
|
flash("All fields are required!", "danger") |
|
return redirect(url_for('contact')) |
|
|
|
try: |
|
|
|
rating = int(rating) |
|
if rating < 1 or rating > 5: |
|
raise ValueError("Rating must be between 1 and 5.") |
|
except ValueError as ve: |
|
logger.warning(str(ve)) |
|
flash(str(ve), "danger") |
|
return redirect(url_for('contact')) |
|
|
|
|
|
review_document = { |
|
"name": name, |
|
"email": email, |
|
"review": review, |
|
"rating": rating, |
|
"timestamp": datetime.utcnow() |
|
} |
|
|
|
|
|
reviews_collection.insert_one(review_document) |
|
|
|
logger.info("Thank you for your feedback!") |
|
flash("Thank you for your feedback!", "success") |
|
return redirect(url_for('homepage')) |
|
|
|
return render_template('contact.html') |
|
|
|
|
|
@app.route('/') |
|
def homepage(): |
|
return render_template('homepage.html') |
|
|
|
|
|
@app.route('/recommendations', methods=['GET', 'POST']) |
|
def index(): |
|
if request.method == 'POST': |
|
book1 = request.form.get('book1') |
|
book2 = request.form.get('book2') |
|
book3 = request.form.get('book3') |
|
|
|
input_books = [] |
|
for book in [book1, book2, book3]: |
|
if book: |
|
matched_books = search_books(book) |
|
if matched_books: |
|
input_books.append(matched_books[0]['title']) |
|
|
|
if not input_books: |
|
flash("Please enter at least one valid book.", "danger") |
|
return redirect(url_for('index')) |
|
|
|
recommendations = find_top_common_books(input_books) |
|
logger.info('Recommendations generated successfully!') |
|
|
|
|
|
flattened_recommendations = [ |
|
book for sublist in recommendations for book in sublist |
|
] |
|
|
|
|
|
return render_template( |
|
'index.html', |
|
recommendations=flattened_recommendations |
|
) |
|
|
|
return render_template('index.html') |
|
|
|
|
|
@app.route('/book/<title>') |
|
def book_detail(title): |
|
|
|
decoded_title = unquote_plus(title) |
|
|
|
|
|
details = fetch_book_details(decoded_title, GOOGLE_BOOKS_API_KEY) |
|
|
|
|
|
book_in_db = books_collection.find_one({"Book-Title": decoded_title}, {"Image-URL-M": 1}) |
|
image_url = book_in_db["Image-URL-M"] if book_in_db else "/static/default.jpg" |
|
|
|
enriched_book = { |
|
'title': decoded_title, |
|
'image_url': image_url, |
|
'authors': details['authors'], |
|
'description': details['description'], |
|
'averageRating': details['averageRating'], |
|
'ratingsCount': details['ratingsCount'], |
|
'publishedDate': details['publishedDate'], |
|
'pageCount': details['pageCount'], |
|
'language': details['language'], |
|
'publisher': details['publisher'] |
|
} |
|
|
|
logger.info('Book details fetched successfully!') |
|
return render_template('book_detail.html', book=enriched_book) |
|
|
|
|
|
@app.route('/profile') |
|
@login_required |
|
def profile(): |
|
return render_template('profile.html', user=current_user) |
|
|
|
|
|
def search_books(title): |
|
"""Search for book titles in MongoDB similar to the given title.""" |
|
query = {"Book-Title": {"$regex": str(title), "$options": "i"}} |
|
matched_books = books_collection.find(query, {"Book-Title": 1, "Image-URL-M": 1}).limit(2) |
|
return [{"title": book["Book-Title"], "image_url": book["Image-URL-M"]} for book in matched_books] |
|
|
|
|
|
@app.template_filter('url_encode') |
|
def url_encode_filter(s): |
|
return quote_plus(s) |
|
|
|
@app.template_filter('url_decode') |
|
def url_decode_filter(s): |
|
return unquote_plus(s) |
|
|
|
if __name__ == '__main__': |
|
app.run(debug=True) |