Spaces:
Sleeping
Sleeping
# # import streamlit as st | |
# # import torch | |
# # from transformers import GPTNeoXForCausalLM, AutoTokenizer | |
# # from sentence_transformers import SentenceTransformer | |
# # import faiss | |
# # import fitz # PyMuPDF | |
# # from langchain_text_splitters import RecursiveCharacterTextSplitter | |
# # # 1. Set page config FIRST | |
# # st.set_page_config(page_title="π Smart Book Analyst", layout="wide") | |
# # # Configuration | |
# # MODEL_NAME = "ibm-granite/granite-3.1-1b-a400m-instruct" | |
# # EMBED_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
# # DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
# # CHUNK_SIZE = 512 | |
# # CHUNK_OVERLAP = 50 | |
# # @st.cache_resource | |
# # def load_models(): | |
# # try: | |
# # # Load Granite model | |
# # tokenizer = AutoTokenizer.from_pretrained( | |
# # MODEL_NAME, | |
# # trust_remote_code=True | |
# # ) | |
# # model = GPTNeoXForCausalLM.from_pretrained( | |
# # MODEL_NAME, | |
# # device_map="auto" if DEVICE == "cuda" else None, | |
# # torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32, | |
# # trust_remote_code=True | |
# # ).eval() | |
# # # Load sentence transformer for embeddings | |
# # embedder = SentenceTransformer(EMBED_MODEL, device=DEVICE) | |
# # return tokenizer, model, embedder | |
# # except Exception as e: | |
# # st.error(f"Model loading failed: {str(e)}") | |
# # st.stop() | |
# # tokenizer, model, embedder = load_models() | |
# # # Text processing | |
# # def process_text(text): | |
# # splitter = RecursiveCharacterTextSplitter( | |
# # chunk_size=CHUNK_SIZE, | |
# # chunk_overlap=CHUNK_OVERLAP, | |
# # length_function=len | |
# # ) | |
# # return splitter.split_text(text) | |
# # # PDF extraction | |
# # def extract_pdf_text(uploaded_file): | |
# # try: | |
# # doc = fitz.open(stream=uploaded_file.read(), filetype="pdf") | |
# # return "\n".join([page.get_text() for page in doc]) | |
# # except Exception as e: | |
# # st.error(f"PDF extraction error: {str(e)}") | |
# # return "" | |
# # # Summarization function | |
# # def generate_summary(text): | |
# # chunks = process_text(text)[:10] | |
# # summaries = [] | |
# # for chunk in chunks: | |
# # prompt = f"""<|user|> | |
# # Summarize this text section focusing on key themes, characters, and plot points: | |
# # {chunk[:2000]} | |
# # <|assistant|> | |
# # """ | |
# # inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE) | |
# # outputs = model.generate(**inputs, max_new_tokens=300, temperature=0.3) | |
# # summaries.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) | |
# # combined = "\n".join(summaries) | |
# # final_prompt = f"""<|user|> | |
# # Combine these section summaries into a coherent book summary: | |
# # {combined} | |
# # <|assistant|> | |
# # The comprehensive summary is:""" | |
# # inputs = tokenizer(final_prompt, return_tensors="pt").to(DEVICE) | |
# # outputs = model.generate(**inputs, max_new_tokens=500, temperature=0.5) | |
# # return tokenizer.decode(outputs[0], skip_special_tokens=True).split(":")[-1].strip() | |
# # # FAISS index creation | |
# # def build_faiss_index(texts): | |
# # embeddings = embedder.encode(texts, show_progress_bar=False) | |
# # dimension = embeddings.shape[1] | |
# # index = faiss.IndexFlatIP(dimension) | |
# # faiss.normalize_L2(embeddings) | |
# # index.add(embeddings) | |
# # return index | |
# # # Answer generation | |
# # def generate_answer(query, context): | |
# # prompt = f"""<|user|> | |
# # Using this context: {context} | |
# # Answer the question precisely and truthfully. If unsure, say "I don't know". | |
# # Question: {query} | |
# # <|assistant|> | |
# # """ | |
# # inputs = tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True).to(DEVICE) | |
# # outputs = model.generate( | |
# # **inputs, | |
# # max_new_tokens=300, | |
# # temperature=0.4, | |
# # top_p=0.9, | |
# # repetition_penalty=1.2, | |
# # do_sample=True | |
# # ) | |
# # return tokenizer.decode(outputs[0], skip_special_tokens=True).split("<|assistant|>")[-1].strip() | |
# # # Streamlit UI | |
# # st.title("π AI-Powered Book Analysis System") | |
# # uploaded_file = st.file_uploader("Upload book (PDF or TXT)", type=["pdf", "txt"]) | |
# # if uploaded_file: | |
# # with st.spinner("π Analyzing book content..."): | |
# # try: | |
# # if uploaded_file.type == "application/pdf": | |
# # text = extract_pdf_text(uploaded_file) | |
# # else: | |
# # text = uploaded_file.read().decode() | |
# # chunks = process_text(text) | |
# # st.session_state.docs = chunks | |
# # st.session_state.index = build_faiss_index(chunks) | |
# # with st.expander("π Book Summary", expanded=True): | |
# # summary = generate_summary(text) | |
# # st.write(summary) | |
# # except Exception as e: | |
# # st.error(f"Processing failed: {str(e)}") | |
# # if 'index' in st.session_state and st.session_state.index: | |
# # query = st.text_input("Ask about the book:") | |
# # if query: | |
# # with st.spinner("π Searching for answers..."): | |
# # try: | |
# # query_embed = embedder.encode([query]) | |
# # faiss.normalize_L2(query_embed) | |
# # distances, indices = st.session_state.index.search(query_embed, k=3) | |
# # context = "\n".join([st.session_state.docs[i] for i in indices[0]]) | |
# # answer = generate_answer(query, context) | |
# # st.subheader("Answer") | |
# # st.markdown(f"```\n{answer}\n```") | |
# # st.caption("Retrieved context confidence: {:.2f}".format(distances[0][0])) | |
# # except Exception as e: | |
# # st.error(f"Query failed: {str(e)}") | |
# import streamlit as st | |
# import torch | |
# from transformers import GPTNeoXForCausalLM, AutoTokenizer | |
# from sentence_transformers import SentenceTransformer | |
# import faiss | |
# import fitz | |
# from langchain_text_splitters import RecursiveCharacterTextSplitter | |
# # Set page config FIRST | |
# st.set_page_config(page_title="π Smart Book Analyst", layout="wide") | |
# # Configuration | |
# MODEL_NAME = "ibm-granite/granite-3.1-1b-a400m-instruct" | |
# EMBED_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
# DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
# CHUNK_SIZE = 1024 # Increased chunk size for better performance | |
# CHUNK_OVERLAP = 100 | |
# MAX_SUMMARY_CHUNKS = 5 # Reduced from 10 to 5 for faster processing | |
# @st.cache_resource | |
# def load_models(): | |
# try: | |
# # Load model with optimized settings | |
# tokenizer = AutoTokenizer.from_pretrained( | |
# MODEL_NAME, | |
# trust_remote_code=True | |
# ) | |
# model = GPTNeoXForCausalLM.from_pretrained( | |
# MODEL_NAME, | |
# device_map="auto", | |
# torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32, | |
# trust_remote_code=True, | |
# low_cpu_mem_usage=True | |
# ).eval() | |
# # Load embedder with faster model | |
# embedder = SentenceTransformer(EMBED_MODEL, device=DEVICE) | |
# embedder.max_seq_length = 256 # Reduce embedding dimension | |
# return tokenizer, model, embedder | |
# except Exception as e: | |
# st.error(f"Model loading failed: {str(e)}") | |
# st.stop() | |
# tokenizer, model, embedder = load_models() | |
# def process_text(text): | |
# splitter = RecursiveCharacterTextSplitter( | |
# chunk_size=CHUNK_SIZE, | |
# chunk_overlap=CHUNK_OVERLAP, | |
# length_function=len | |
# ) | |
# return splitter.split_text(text) | |
# def extract_pdf_text(uploaded_file): | |
# try: | |
# doc = fitz.open(stream=uploaded_file.read(), filetype="pdf") | |
# return "\n".join(page.get_text() for page in doc) | |
# except Exception as e: | |
# st.error(f"PDF extraction error: {str(e)}") | |
# return "" | |
# def generate_summary(text): | |
# chunks = process_text(text)[:MAX_SUMMARY_CHUNKS] | |
# if not chunks: | |
# return "No meaningful content found." | |
# progress_bar = st.progress(0) | |
# summaries = [] | |
# for i, chunk in enumerate(chunks): | |
# progress_bar.progress((i+1)/len(chunks), text=f"Processing chunk {i+1}/{len(chunks)}...") | |
# prompt = f"""<|user|> | |
# Summarize key points in 2 sentences: | |
# {chunk[:1500]} | |
# <|assistant|> | |
# """ | |
# inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE) | |
# outputs = model.generate( | |
# **inputs, | |
# max_new_tokens=150, | |
# temperature=0.2, | |
# do_sample=False # Disable sampling for faster generation | |
# ) | |
# summaries.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) | |
# combined = "\n".join(summaries) | |
# final_prompt = f"""<|user|> | |
# Combine these into a concise summary (3-5 paragraphs): | |
# {combined} | |
# <|assistant|> | |
# Summary:""" | |
# inputs = tokenizer(final_prompt, return_tensors="pt").to(DEVICE) | |
# outputs = model.generate( | |
# **inputs, | |
# max_new_tokens=300, | |
# temperature=0.3, | |
# do_sample=False | |
# ) | |
# return tokenizer.decode(outputs[0], skip_special_tokens=True).split("Summary:")[-1].strip() | |
# def build_faiss_index(texts): | |
# embeddings = embedder.encode(texts, show_progress_bar=False, batch_size=32) | |
# dimension = embeddings.shape[1] | |
# index = faiss.IndexFlatIP(dimension) | |
# faiss.normalize_L2(embeddings) | |
# index.add(embeddings) | |
# return index | |
# def generate_answer(query, context): | |
# prompt = f"""<|user|> | |
# Context: {context[:2000]} | |
# Q: {query} | |
# A:""" | |
# inputs = tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True).to(DEVICE) | |
# outputs = model.generate( | |
# **inputs, | |
# max_new_tokens=200, | |
# temperature=0.3, | |
# top_p=0.85, | |
# repetition_penalty=1.1, | |
# do_sample=True | |
# ) | |
# return tokenizer.decode(outputs[0], skip_special_tokens=True).split("A:")[-1].strip() | |
# # Streamlit UI | |
# st.title("π AI-Powered Book Analysis System") | |
# uploaded_file = st.file_uploader("Upload book (PDF or TXT)", type=["pdf", "txt"]) | |
# if uploaded_file: | |
# with st.spinner("π Analyzing book content..."): | |
# try: | |
# if uploaded_file.type == "application/pdf": | |
# text = extract_pdf_text(uploaded_file) | |
# else: | |
# text = uploaded_file.read().decode() | |
# if not text.strip(): | |
# st.error("Uploaded file appears to be empty") | |
# st.stop() | |
# chunks = process_text(text) | |
# st.session_state.docs = chunks | |
# st.session_state.index = build_faiss_index(chunks) | |
# with st.expander("π Book Summary", expanded=True): | |
# summary = generate_summary(text) | |
# st.write(summary) | |
# except Exception as e: | |
# st.error(f"Processing failed: {str(e)}") | |
# if 'index' in st.session_state and st.session_state.index: | |
# query = st.text_input("Ask about the book:") | |
# if query: | |
# with st.spinner("π Searching for answers..."): | |
# try: | |
# query_embed = embedder.encode([query]) | |
# faiss.normalize_L2(query_embed) | |
# distances, indices = st.session_state.index.search(query_embed, k=2) | |
# context = "\n".join([st.session_state.docs[i] for i in indices[0]]) | |
# answer = generate_answer(query, context) | |
# st.subheader("Answer") | |
# st.markdown(f"```\n{answer}\n```") | |
# st.caption(f"Confidence: {distances[0][0]:.2f}") | |
# except Exception as e: | |
# st.error(f"Query failed: {str(e)}") | |
import streamlit as st | |
import torch | |
from transformers import GPTNeoXForCausalLM, AutoTokenizer | |
from sentence_transformers import SentenceTransformer | |
import faiss | |
import fitz | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
# Set page config first | |
st.set_page_config(page_title="π Smart Book Analyst", layout="wide") | |
# Configuration | |
MODEL_NAME = "ibm-granite/granite-3.1-1b-a400m-instruct" | |
EMBED_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
CHUNK_SIZE = 1024 | |
CHUNK_OVERLAP = 100 | |
MAX_SUMMARY_CHUNKS = 5 | |
def load_models(): | |
try: | |
# Load model with correct tokenizer mapping | |
tokenizer = AutoTokenizer.from_pretrained( | |
MODEL_NAME, | |
trust_remote_code=True, | |
padding_side="left" # Crucial for generation quality | |
) | |
tokenizer.pad_token = tokenizer.eos_token | |
model = GPTNeoXForCausalLM.from_pretrained( | |
MODEL_NAME, | |
device_map="auto", | |
torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32, | |
trust_remote_code=True, | |
low_cpu_mem_usage=True | |
).eval() | |
# Configure embedder properly | |
embedder = SentenceTransformer(EMBED_MODEL, device=DEVICE) | |
embedder.max_seq_length = 512 | |
return tokenizer, model, embedder | |
except Exception as e: | |
st.error(f"Model loading failed: {str(e)}") | |
st.stop() | |
tokenizer, model, embedder = load_models() | |
def process_text(text): | |
splitter = RecursiveCharacterTextSplitter( | |
chunk_size=CHUNK_SIZE, | |
chunk_overlap=CHUNK_OVERLAP, | |
length_function=len | |
) | |
return splitter.split_text(text) | |
def extract_pdf_text(uploaded_file): | |
try: | |
doc = fitz.open(stream=uploaded_file.read(), filetype="pdf") | |
return "\n".join(page.get_text() for page in doc) | |
except Exception as e: | |
st.error(f"PDF extraction error: {str(e)}") | |
return "" | |
def generate_summary(text): | |
chunks = process_text(text)[:MAX_SUMMARY_CHUNKS] | |
if not chunks: | |
return "No meaningful content found." | |
progress_bar = st.progress(0) | |
summaries = [] | |
for i, chunk in enumerate(chunks): | |
# Proper progress text formatting | |
progress_bar.progress((i+1)/len(chunks), | |
text=f"Processing section {i+1}/{len(chunks)}...") | |
prompt = f"""<|user|> | |
Summarize the key points from this text section in 3 bullet points: | |
{chunk[:1500]} | |
<|assistant|> | |
""" | |
inputs = tokenizer( | |
prompt, | |
return_tensors="pt", | |
max_length=1024, | |
truncation=True | |
).to(DEVICE) | |
outputs = model.generate( | |
**inputs, | |
max_new_tokens=200, | |
temperature=0.3, | |
top_p=0.9, | |
repetition_penalty=1.1, | |
do_sample=True, | |
pad_token_id=tokenizer.eos_token_id # Critical fix | |
) | |
decoded = tokenizer.decode( | |
outputs[0], | |
skip_special_tokens=True | |
).split("<|assistant|>")[-1].strip() | |
summaries.append(decoded) | |
combined = "\n\n".join(summaries) | |
final_prompt = f"""<|user|> | |
Combine these bullet points into a coherent 3-paragraph summary: | |
{combined} | |
<|assistant|> | |
Here is the comprehensive summary:""" | |
inputs = tokenizer(final_prompt, return_tensors="pt").to(DEVICE) | |
outputs = model.generate( | |
**inputs, | |
max_new_tokens=400, | |
temperature=0.5, | |
top_p=0.9, | |
repetition_penalty=1.1, | |
do_sample=True, | |
pad_token_id=tokenizer.eos_token_id | |
) | |
return tokenizer.decode( | |
outputs[0], | |
skip_special_tokens=True | |
).split("Here is the comprehensive summary:")[-1].strip() | |
def build_faiss_index(texts): | |
embeddings = embedder.encode( | |
texts, | |
show_progress_bar=False, | |
batch_size=16, | |
convert_to_tensor=True | |
).cpu().numpy() | |
dimension = embeddings.shape[1] | |
index = faiss.IndexFlatIP(dimension) | |
faiss.normalize_L2(embeddings) | |
index.add(embeddings) | |
return index | |
def generate_answer(query, context): | |
prompt = f"""<|user|> | |
Based on this context: | |
{context[:2000]} | |
Answer this question concisely: {query} | |
<|assistant|> | |
""" | |
inputs = tokenizer( | |
prompt, | |
return_tensors="pt", | |
max_length=1024, | |
truncation=True | |
).to(DEVICE) | |
outputs = model.generate( | |
**inputs, | |
max_new_tokens=300, | |
temperature=0.4, | |
top_p=0.95, | |
repetition_penalty=1.15, | |
do_sample=True, | |
pad_token_id=tokenizer.eos_token_id, | |
no_repeat_ngram_size=3 # Prevent repetition | |
) | |
return tokenizer.decode( | |
outputs[0], | |
skip_special_tokens=True | |
).split("<|assistant|>")[-1].strip() | |
# Streamlit UI | |
st.title("π AI-Powered Book Analysis System") | |
uploaded_file = st.file_uploader("Upload book (PDF or TXT)", type=["pdf", "txt"]) | |
if uploaded_file: | |
with st.spinner("π Analyzing book content..."): | |
try: | |
if uploaded_file.type == "application/pdf": | |
text = extract_pdf_text(uploaded_file) | |
else: | |
text = uploaded_file.read().decode() | |
if not text.strip(): | |
st.error("Uploaded file is empty") | |
st.stop() | |
chunks = process_text(text) | |
st.session_state.docs = chunks | |
st.session_state.index = build_faiss_index(chunks) | |
with st.expander("π Book Summary", expanded=True): | |
summary = generate_summary(text) | |
st.write(summary) | |
except Exception as e: | |
st.error(f"Processing failed: {str(e)}") | |
if 'index' in st.session_state and st.session_state.index: | |
query = st.text_input("Ask about the book:") | |
if query: | |
with st.spinner("π Searching for answers..."): | |
try: | |
query_embed = embedder.encode([query]) | |
faiss.normalize_L2(query_embed) | |
distances, indices = st.session_state.index.search(query_embed, k=3) | |
context = "\n".join([st.session_state.docs[i] for i in indices[0]]) | |
answer = generate_answer(query, context) | |
st.subheader("Answer") | |
st.markdown(f"```\n{answer}\n```") | |
st.caption(f"Confidence score: {distances[0][0]:.2f}") | |
except Exception as e: | |
st.error(f"Query failed: {str(e)}") |