Book / src /app.py
anwesh2410's picture
Upload 39 files
54862ee verified
raw
history blame
12.2 kB
from flask import Flask, render_template, request, redirect, url_for, flash
from flask_login import (
LoginManager,
login_user,
login_required,
logout_user,
current_user,
UserMixin
)
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
from pymongo import MongoClient
from recommend import find_top_common_books
from urllib.parse import quote_plus, unquote_plus
from bson.objectid import ObjectId
import requests
import logging
import os
app = Flask(__name__, template_folder='../templates', static_folder='../static')
app.secret_key = os.environ.get('SECRET_KEY', 'your_default_secret_key') # Replace with a secure key
# Initialize Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login' # Redirect to 'login' view if unauthorized
# Google Books API Key from environment variable
GOOGLE_BOOKS_API_KEY = 'AIzaSyBqTms_1DSR3xcxCapdmizZiZUMaswaI9M' # Set this in your environment
# Initialize MongoDB client
client = MongoClient("mongodb+srv://Atharva:[email protected]/book_dataset?retryWrites=true&w=majority")
db = client['book_dataset']
books_collection = db['BOOK']
reviews_collection = db['reviews'] # New Collection for Reviews
users_collection = db['users'] # New Collection for Users
# Simple in-memory cache for book details
book_details_cache = {}
def fetch_book_details(title, api_key=None):
"""Fetch book details from Google Books API based on the book title."""
if title in book_details_cache:
return book_details_cache[title]
base_url = "https://www.googleapis.com/books/v1/volumes/"
params = {
'q': title,
'maxResults': 1
}
if api_key:
params['key'] = api_key
try:
response = requests.get(base_url, params=params)
response.raise_for_status()
data = response.json()
if 'items' in data and len(data['items']) > 0:
volume_info = data['items'][0].get('volumeInfo', {})
book_details = {
'authors': volume_info.get('authors', ["Unknown Author"]),
'description': volume_info.get('description', "No description available."),
'averageRating': volume_info.get('averageRating', "N/A"),
'ratingsCount': volume_info.get('ratingsCount', "N/A"),
'publishedDate': volume_info.get('publishedDate', "N/A"),
'pageCount': volume_info.get('pageCount', "N/A"),
'language': volume_info.get('language', "N/A"),
'publisher': volume_info.get('publisher', "N/A")
}
book_details_cache[title] = book_details
return book_details
else:
book_details = {
'authors': ["Unknown Author"],
'description': "No description available.",
'averageRating': "N/A",
'ratingsCount': "N/A",
'publishedDate': "N/A",
'pageCount': "N/A",
'language': "N/A",
'publisher': "N/A"
}
book_details_cache[title] = book_details
return book_details
except requests.exceptions.RequestException as e:
print(f"Error fetching details for '{title}': {e}")
book_details = {
'authors': ["Unknown Author"],
'description': "No description available.",
'averageRating': "N/A",
'ratingsCount': "N/A",
'publishedDate': "N/A",
'pageCount': "N/A",
'language': "N/A",
'publisher': "N/A"
}
book_details_cache[title] = book_details
return book_details
# User Model
class User(UserMixin):
def __init__(self, user_data):
self.id = str(user_data['_id'])
self.username = user_data['username']
self.email = user_data['email']
@staticmethod
def get(user_id):
user_data = users_collection.find_one({'_id': ObjectId(user_id)})
if user_data:
return User(user_data)
return None
@login_manager.user_loader
def load_user(user_id):
return User.get(user_id)
# Logging Configuration
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create handlers
file_handler = logging.FileHandler('app.log')
stream_handler = logging.StreamHandler()
# Create formatters and add to handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# Add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
# Registration Route
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
# Get form data
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
# Validate form inputs
if not all([username, email, password, confirm_password]):
logger.warning('Please fill out all fields.')
flash('Please fill out all fields.', 'danger')
return redirect(url_for('register'))
if password != confirm_password:
logger.warning('Passwords do not match.')
flash('Passwords do not match.', 'danger')
return redirect(url_for('register'))
# Check if user already exists
existing_user = users_collection.find_one({'email': email})
if existing_user:
logger.warning('Email already registered.')
flash('Email already registered.', 'danger')
return redirect(url_for('register'))
# Hash the password
password_hash = generate_password_hash(password)
# Create new user document
new_user = {
'username': username,
'email': email,
'password': password_hash,
'created_at': datetime.utcnow()
}
# Insert the new user into the database
users_collection.insert_one(new_user)
logger.info('Registration successful!')
flash('Registration successful! Please log in.', 'success')
return redirect(url_for('login'))
return render_template('register.html')
# Login Route
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
# Get form data
email = request.form.get('email')
password = request.form.get('password')
# Find user by email
user_data = users_collection.find_one({'email': email})
if user_data and check_password_hash(user_data['password'], password):
user = User(user_data)
login_user(user)
logger.info('Logged in successfully!')
flash('Logged in successfully!', 'success')
next_page = request.args.get('next')
return redirect(next_page or url_for('homepage'))
else:
logger.warning('Invalid email or password.')
flash('Invalid email or password.', 'danger')
return redirect(url_for('login'))
return render_template('login.html')
# Logout Route
@app.route('/logout')
@login_required
def logout():
logout_user()
logger.info('You have been logged out.')
flash('You have been logged out.', 'success')
return redirect(url_for('homepage'))
# Contact Route
@app.route('/contact', methods=['GET', 'POST'])
def contact():
if request.method == 'POST':
# Retrieve form data
name = request.form.get('name')
email = request.form.get('email')
review = request.form.get('review')
rating = request.form.get('rating')
# Input Validation (Basic Example)
if not all([name, email, review, rating]):
logger.warning("All fields are required!")
flash("All fields are required!", "danger")
return redirect(url_for('contact'))
try:
# Convert rating to integer
rating = int(rating)
if rating < 1 or rating > 5:
raise ValueError("Rating must be between 1 and 5.")
except ValueError as ve:
logger.warning(str(ve))
flash(str(ve), "danger")
return redirect(url_for('contact'))
# Create a review document
review_document = {
"name": name,
"email": email,
"review": review,
"rating": rating,
"timestamp": datetime.utcnow() # Optional: Add timestamp
}
# Insert the review into the 'reviews' collection
reviews_collection.insert_one(review_document)
logger.info("Thank you for your feedback!")
flash("Thank you for your feedback!", "success")
return redirect(url_for('homepage'))
return render_template('contact.html')
# Homepage Route
@app.route('/')
def homepage():
return render_template('homepage.html')
# Recommendations Route
@app.route('/recommendations', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
book1 = request.form.get('book1')
book2 = request.form.get('book2')
book3 = request.form.get('book3')
input_books = []
for book in [book1, book2, book3]:
if book:
matched_books = search_books(book)
if matched_books:
input_books.append(matched_books[0]['title'])
if not input_books:
flash("Please enter at least one valid book.", "danger")
return redirect(url_for('index'))
recommendations = find_top_common_books(input_books)
logger.info('Recommendations generated successfully!')
# Flatten the recommendations list
flattened_recommendations = [
book for sublist in recommendations for book in sublist
]
# Pass the flattened list to the template with minimal information
return render_template(
'index.html',
recommendations=flattened_recommendations
)
return render_template('index.html')
# Book Detail Route
@app.route('/book/<title>')
def book_detail(title):
# Decode the title from URL
decoded_title = unquote_plus(title)
# Fetch book details (from cache or API)
details = fetch_book_details(decoded_title, GOOGLE_BOOKS_API_KEY)
# Fetch image_url from MongoDB if needed
book_in_db = books_collection.find_one({"Book-Title": decoded_title}, {"Image-URL-M": 1})
image_url = book_in_db["Image-URL-M"] if book_in_db else "/static/default.jpg"
enriched_book = {
'title': decoded_title,
'image_url': image_url,
'authors': details['authors'],
'description': details['description'],
'averageRating': details['averageRating'],
'ratingsCount': details['ratingsCount'],
'publishedDate': details['publishedDate'],
'pageCount': details['pageCount'],
'language': details['language'],
'publisher': details['publisher']
}
logger.info('Book details fetched successfully!')
return render_template('book_detail.html', book=enriched_book)
# Profile Route
@app.route('/profile')
@login_required
def profile():
return render_template('profile.html', user=current_user)
# Search Books Function
def search_books(title):
"""Search for book titles in MongoDB similar to the given title."""
query = {"Book-Title": {"$regex": str(title), "$options": "i"}}
matched_books = books_collection.find(query, {"Book-Title": 1, "Image-URL-M": 1}).limit(2)
return [{"title": book["Book-Title"], "image_url": book["Image-URL-M"]} for book in matched_books]
# Template Filters
@app.template_filter('url_encode')
def url_encode_filter(s):
return quote_plus(s)
@app.template_filter('url_decode')
def url_decode_filter(s):
return unquote_plus(s)
if __name__ == '__main__':
app.run(debug=True)