import PyPDF2 import faiss import numpy as np from transformers import AutoModelForSeq2SeqLM, AutoTokenizer # Load the LLM for generation generation_model_name = 'facebook/bart-large-cnn' generation_model = AutoModelForSeq2SeqLM.from_pretrained(generation_model_name) tokenizer = AutoTokenizer.from_pretrained(generation_model_name) #Specify file paths file_path1 = './AST-1.pdf' file_path2 = './AST-2.pdf' #Step 1 : Load the document files def read_pdf(file_path): with open(file_path, 'rb') as file: reader = PyPDF2.PdfReader(file) text = '' for page_num in range(len(reader.pages)): page = reader.pages[page_num] text += page.extract_text() return text ast1_text = read_pdf(file_path1) ast2_text = read_pdf(file_path2) #Step 2 Split the loaded documents into chunks # Split by Fixed Number of Words: def chunk_text(text, chunk_size=200): words = text.split() chunks = [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] return chunks ast1_chunks = chunk_text(ast1_text, chunk_size=100) ast2_chunks = chunk_text(ast2_text, chunk_size=150) #label the chunks ast1_chunks = [(chunk, 'AST-1') for chunk in ast1_chunks] ast2_chunks = [(chunk, 'AST-2') for chunk in ast2_chunks] all_chunks = ast1_chunks + ast2_chunks print('Created the chunks') #Load the Embedding Model and LLM from sentence_transformers import SentenceTransformer from transformers import AutoModelForSeq2SeqLM, AutoTokenizer # Load the pre-trained model from the MTEB leaderboard embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') embeddings = embedding_model.encode(all_chunks, convert_to_tensor=True) # Convert embeddings to numpy arrays embeddings_np = np.array([embedding.cpu().numpy() for embedding in embeddings]) # Create a FAISS index dimension = embeddings_np.shape[1] index = faiss.IndexFlatL2(dimension) index.add(embeddings_np) # Save the index faiss.write_index(index, 'embeddings_index.faiss') # Load FAISS index stored_index = faiss.read_index('./embeddings_index.faiss') print('Stored embedding in db') #Function to retrieve chunks def retrieve_chunks(query, top_k=10, use_mmr=False, diversity=0.5, target_doc='AST-1'): query_embedding = embedding_model.encode(query, convert_to_tensor=True).cpu().numpy() distances, indices = stored_index.search(np.array([query_embedding]), top_k) if use_mmr: # Implement MMR-based retrieval from sklearn.metrics.pairwise import cosine_similarity selected_indices = [] selected_distances = [] candidate_indices = [i for i in indices[0] if all_chunks[i][1] == target_doc] candidate_distances = [distances[0][i] for i in range(len(indices[0])) if all_chunks[indices[0][i]][1] == target_doc] while len(selected_indices) < top_k and candidate_indices: if not selected_indices: selected_indices.append(candidate_indices.pop(0)) selected_distances.append(candidate_distances.pop(0)) else: remaining_candidates = [candidate_indices[i] for i in range(len(candidate_indices))] remaining_embeddings = embeddings_np[remaining_candidates] selected_embeddings = embeddings_np[selected_indices] similarities = cosine_similarity(remaining_embeddings, selected_embeddings) mmr_scores = (1 - diversity) * np.array(candidate_distances[:len(remaining_candidates)]) - diversity * np.max(similarities, axis=1) next_index = np.argmax(mmr_scores) selected_indices.append(candidate_indices.pop(next_index)) selected_distances.append(candidate_distances.pop(next_index)) return [all_chunks[i][0] for i in selected_indices] else: retrieved_chunks = [] for idx in indices[0]: chunk, doc_label = all_chunks[idx] if doc_label == target_doc: retrieved_chunks.append(chunk) if len(retrieved_chunks) >= top_k: break return retrieved_chunks # Generate response def generate_response(query, retrieved_chunks): #context = " ".join([chunk for chunk, _ in retrieved_chunks]) context = " ".join(retrieved_chunks) # for retrieved_chunks as array of strings input_text = f"Query: {query}\nContext: {context}" inputs = tokenizer(input_text, return_tensors='pt', max_length=1024, truncation=True) summary_ids = generation_model.generate(inputs['input_ids'], max_length=150, min_length=40, length_penalty=2.0, num_beams=4, early_stopping=True) return tokenizer.decode(summary_ids[0], skip_special_tokens=True) def rag_system(query, use_mmr=False): retrieved_chunks = retrieve_chunks(query, top_k=3, use_mmr=use_mmr) response = generate_response(query, retrieved_chunks) print(response) return response import gradio as gr def query_rag_system(query, use_mmr): return rag_system(query, use_mmr=use_mmr) interface = gr.Interface( fn=query_rag_system, inputs=[ gr.Textbox(lines=2, placeholder="Enter your query here..."), gr.Checkbox(label="Use MMR") ], outputs="text", title="RAG System", description="Enter a query to get a response from the RAG system. Optionally, use MMR for better results." ) interface.launch()