Spaces:
Sleeping
Sleeping
File size: 11,912 Bytes
6e51e6f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
import feedparser
from urllib.parse import quote
from newspaper import Article
from urllib.parse import quote
from llama_index import Document
from typing import Any, List, Tuple
from datetime import datetime, timedelta
from llama_index import PromptTemplate
from llama_index.query_engine import RetrieverQueryEngine
from llama_index import get_response_synthesizer
from llama_index.schema import NodeWithScore
from llama_index.query_engine import RetrieverQueryEngine
from llama_index import VectorStoreIndex, ServiceContext
from llama_index.query_engine import RetrieverQueryEngine
from llama_index.llms.base import llm_completion_callback
from llama_index.postprocessor import SentenceTransformerRerank
from llama_index.retrievers import VectorIndexRetriever, BaseRetriever, BM25Retriever
class NewsFeedParser:
def __init__(self):
"""
Initializes the NewsFeedParser class.
"""
self.articles_data = []
def create_search_urls(self, question, base_urls):
"""
Converts a question into properly formatted search URLs for multiple base URLs.
Parameters:
question (str): The query or question to be converted into search URLs.
base_urls (list): A list of base URLs for the search services.
Returns:
list: A list of formatted search URLs, each containing the encoded question.
"""
# URL Encoding
encoded_question = quote(question)
# Constructing the full URLs for each base URL
search_urls = [base_url + encoded_question for base_url in base_urls]
return search_urls
def parse_feed(self, rss_url):
"""
Parses the RSS feed from a given URL and processes each entry.
Parameters:
rss_url (str): URL of the RSS feed to be parsed.
"""
news_feed = feedparser.parse(rss_url)
content = news_feed.entries
# Get the current date
current_date = datetime.now()
for entry in content:
# Extract and format the publication date
newformat = "%a, %d %b %Y %H:%M:%S %Z"
published_date = datetime.strptime(entry.published, newformat)
# Check if the article is within the last week
if current_date - published_date <= timedelta(days=7):
# Extract the article text
article_text = self.extract_article_text(entry.link)
# Only add to the list if article text is successfully extracted
if article_text:
self.articles_data.append({
'link': entry.link,
'published': published_date.strftime("%Y-%m-%d %H:%M:%S"),
'article_text': article_text
})
def extract_article_text(self, url):
"""
Extracts text from a given article URL.
Parameters:
url (str): The URL of the article from which to extract text.
Returns:
str: Extracted article text. Returns None if extraction fails.
"""
try:
article = Article(url)
article.download()
article.parse()
return article.text
except Exception as e:
print(f"An error occurred: {e}")
return None
def process_query(self, input_query):
"""
Processes an input query to extract articles related to the query from multiple sources.
Parameters:
input_query (str): The query from which to extract information.
Returns:
list: A list of articles with their details.
"""
# Define base URLs for multiple news sources
base_urls = [
'https://news.google.com/rss/search?q=',
'http://www.ft.com/rss/markets?q='
#'https://www.bloomberg.com/search?query='
]
# Step 1: Create search URLs for each base URL
search_urls = self.create_search_urls(input_query, base_urls)
# Step 2: Parse the feed for each search URL
for url in search_urls:
self.parse_feed(url)
# Return the accumulated articles from all feeds
return self.articles_data
def chunk_text_by_words_with_overlap(self, text, max_words, overlap, metadata):
"""
Splits the text into chunks of a specified number of words with a specified overlap
and attaches metadata to each chunk.
"""
words = text.split()
chunks = [' '.join(words[i:i + max_words]) for i in range(0, len(words), max_words - overlap)]
# Ensure the last chunk doesn't exceed the text length
if len(chunks) > 1 and len(chunks[-1].split()) < overlap:
chunks[-2] = ' '.join(chunks[-2:])
chunks.pop(-1)
return [{'text': chunk, **metadata} for chunk in chunks]
def process_and_chunk_articles(self, input_query, max_words=250, overlap=20):
"""
Processes an input query, fetches related articles, and chunks their text.
Returns a list of Document objects with attached metadata.
"""
# Process the query and get articles
articles = self.process_query(input_query)
print(len(articles))
# Chunk each article's text and create Document objects
documents = []
metadata_list = []
for article in articles:
article_chunks = self.chunk_text_by_words_with_overlap(
article['article_text'],
max_words,
overlap,
metadata={'link': article['link'], 'published': article['published']}
)
for chunk in article_chunks:
documents.append(Document(text=chunk['text']))
metadata_list.append({'link': chunk['link'], 'published_date': chunk['published']})
# Add metadata to each document
for doc, meta in zip(documents, metadata_list):
doc.metadata = meta
return documents
class HybridRetriever(BaseRetriever):
"""
A hybrid retriever that combines results from two different retrieval methods:
vector-based retrieval and BM25 retrieval.
Attributes:
vector_retriever: An instance of a retriever that uses vector embeddings for retrieval.
bm25_retriever: An instance of a retriever that uses BM25 algorithm for retrieval.
The class inherits from BaseRetriever, indicating that it follows a similar interface.
"""
def __init__(self, vector_retriever, bm25_retriever):
"""
Initializes the HybridRetriever with two different types of retrievers.
Args:
vector_retriever: The retriever instance which uses vector-based retrieval methods.
bm25_retriever: The retriever instance which uses BM25 algorithm for retrieval.
"""
self.vector_retriever = vector_retriever
self.bm25_retriever = bm25_retriever
super().__init__()
def _retrieve(self, query, **kwargs):
"""
Performs a retrieval operation by combining results from both the vector and BM25 retrievers.
Args:
query: The query string based on which the documents are to be retrieved.
**kwargs: Additional keyword arguments that might be required for retrieval.
Returns:
all_nodes: A list of nodes (documents) retrieved by combining results from both retrievers.
This ensures a diverse set of results leveraging the strengths of both retrieval methods.
"""
# Retrieve nodes using BM25 retriever
bm25_nodes = self.bm25_retriever.retrieve(query, **kwargs)
# Retrieve nodes using vector retriever
vector_nodes = self.vector_retriever.retrieve(query, **kwargs)
# Combine the two lists of nodes, ensuring no duplicates
all_nodes = []
node_ids = set()
for n in bm25_nodes + vector_nodes:
# Check if node is already added; if not, add it to the list
if n.node.node_id not in node_ids:
all_nodes.append(n)
node_ids.add(n.node.node_id)
return all_nodes
class NewsQueryEngine:
"""
A class to handle the process of setting up a query engine and performing queries on PDF documents.
This class encapsulates the functionality of creating prompt templates, embedding models, service contexts,
indexes, hybrid retrievers, response synthesizers, and executing queries on the set up engine.
Attributes:
documents (List): A list of documents to be indexed.
llm (Language Model): The language model to be used for embeddings and queries.
qa_prompt_tmpl (str): Template for creating query prompts.
queries (List[str]): List of queries to be executed.
Methods:
setup_query_engine(): Sets up the query engine with all necessary components.
execute_queries(): Executes the predefined queries and prints the results.
"""
def __init__(self, documents: List[Any], llm: Any, embed_model: Any):
self.documents = documents
self.llm = llm
self.embed_model = embed_model
self.qa_prompt_tmpl = (
"Context information is below.\n"
"---------------------\n"
"{context_str}\n"
"---------------------\n"
"As an experienced financial analyst and researcher, you are tasked with helping fellow analysts in research using the latest financial news.\n "
"Your answer will be based on the snippets of latest news provided as context information for each query.\n "
"For each query, provide a concise answer derived from the information provided in the form of news.\n"
"Try to not assume any critical information that might impact the answer. \n "
"Note any major issues in the paper's results and analysis.\n"
"If a query cannot be answered due to lack of information in the context, state this explicitly.\n"
"Query: {query_str}\n"
"Answer:"
)
def setup_query_engine(self) -> Any:
"""
Sets up the query engine by initializing and configuring the embedding model, service context, index,
hybrid retriever (combining vector and BM25 retrievers), and the response synthesizer. Returns the configured query engine.
"""
# Initialize the service context with the language model and embedding model
service_context = ServiceContext.from_defaults(llm=self.llm, embed_model=self.embed_model)
# Create an index from documents
index = VectorStoreIndex.from_documents(documents=self.documents, service_context=service_context)
nodes = service_context.node_parser.get_nodes_from_documents(self.documents)
# Set up vector and BM25 retrievers
vector_retriever = index.as_retriever(similarity_top_k=5)
bm25_retriever = BM25Retriever.from_defaults(nodes=nodes, similarity_top_k=5)
hybrid_retriever = HybridRetriever(vector_retriever, bm25_retriever)
# Configure the response synthesizer with the prompt template
qa_prompt = PromptTemplate(self.qa_prompt_tmpl)
response_synthesizer = get_response_synthesizer(
service_context=service_context,
text_qa_template=qa_prompt,
response_mode="tree_summarize",
)
# Assemble the query engine with a reranker and the synthesizer
reranker = SentenceTransformerRerank(top_n=4, model="BAAI/bge-reranker-base")
query_engine = RetrieverQueryEngine.from_args(
retriever=hybrid_retriever,
node_postprocessors=[reranker],
response_synthesizer=response_synthesizer,
)
return query_engine
|