Shreyas094's picture
Update app.py
302823e verified
raw
history blame
10.3 kB
import requests
from bs4 import BeautifulSoup
import gradio as gr
from huggingface_hub import InferenceClient
import random
import urllib.parse
from datetime import datetime, timedelta
import re
import os
import PyPDF2
# List of user agents to rotate through
_useragent_list = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36",
]
API_URL = "https://api-inference.huggingface.co/models/meta-llama/Meta-Llama-3-8B-Instruct"
headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACE_TOKEN')}"}
def query_llama(payload):
"""Send a query to the Llama model via Hugging Face API"""
try:
print(f"Payload: {payload}") # Debug: Print payload
response = requests.post(API_URL, headers=headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error querying Llama model: {e}")
return None
def google_search(term, num_results=1, lang="en", timeout=30, safe="active", ssl_verify=None, days_back=90):
"""Perform a Google search and return results"""
print(f"Searching for term: {term}")
# Calculate the date range
end_date = datetime.now()
start_date = end_date - timedelta(days=days_back)
# Format dates as strings
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
# Add the date range to the search term
search_term = f"{term} financial earnings report after:{start_date_str} before:{end_date_str}"
escaped_term = urllib.parse.quote_plus(search_term)
start = 0
all_results = []
max_attempts = num_results * 2 # Allow for some failed attempts
with requests.Session() as session:
attempts = 0
while len(all_results) < num_results and attempts < max_attempts:
try:
# Choose a random user agent
user_agent = random.choice(_useragent_list)
headers = {'User-Agent': user_agent}
resp = session.get(
url="https://www.google.com/search",
headers=headers,
params={
"q": search_term,
"num": num_results - len(all_results),
"hl": lang,
"start": start,
"safe": safe,
},
timeout=timeout,
verify=ssl_verify,
)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
result_block = soup.find_all("div", attrs={"class": "g"})
if not result_block:
print("No more results found.")
break
for result in result_block:
if len(all_results) >= num_results:
break
link = result.find("a", href=True)
if link:
link = link["href"]
print(f"Found link: {link}")
try:
webpage = session.get(link, headers=headers, timeout=timeout)
webpage.raise_for_status()
visible_text = extract_text_from_webpage(webpage.text)
all_results.append({"link": link, "text": visible_text})
except requests.exceptions.HTTPError as e:
if e.response.status_code == 403:
print(f"403 Forbidden error for {link}, skipping...")
else:
print(f"HTTP error {e.response.status_code} for {link}, skipping...")
except requests.exceptions.RequestException as e:
print(f"Error fetching or processing {link}: {e}")
else:
print("No link found in result.")
start += len(result_block)
attempts += 1
except requests.exceptions.RequestException as e:
print(f"Error fetching search results: {e}")
attempts += 1
print(f"Total results fetched: {len(all_results)}")
return all_results
def extract_text_from_webpage(html_content):
"""Extract visible text from HTML content"""
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text
text = soup.get_text()
# Break into lines and remove leading and trailing space on each
lines = (line.strip() for line in text.splitlines())
# Break multi-headlines into a line each
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
# Drop blank lines
text = '\n'.join(chunk for chunk in chunks if chunk)
return text
def filter_relevant_content(text):
"""Filter out irrelevant content"""
# List of keywords related to financial reports
keywords = ['revenue', 'profit', 'earnings', 'financial', 'quarter', 'fiscal', 'growth', 'income', 'loss', 'dividend']
# Split the text into sentences
sentences = re.split(r'(?<=[.!?])\s+', text)
# Filter sentences containing at least one keyword
relevant_sentences = [sentence for sentence in sentences if any(keyword in sentence.lower() for keyword in keywords)]
# Join the relevant sentences back into a single string
filtered_text = ' '.join(relevant_sentences)
return filtered_text
def chunk_text(text, max_chunk_size=1000, overlap=100):
# List of keywords that might indicate new sections
section_keywords = ["revenue", "income", "profit", "loss", "expenses", "outlook", "forecast", "quarter", "year"]
# Split text into sentences
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) > max_chunk_size:
# If adding this sentence exceeds max_chunk_size, start a new chunk
chunks.append(current_chunk.strip())
current_chunk = sentence + " "
elif any(keyword in sentence.lower() for keyword in section_keywords):
# If sentence contains a section keyword, start a new chunk
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + " "
else:
current_chunk += sentence + " "
# Add the last chunk if it's not empty
if current_chunk:
chunks.append(current_chunk.strip())
# Add overlap
overlapped_chunks = []
for i, chunk in enumerate(chunks):
if i > 0:
chunk = chunks[i-1][-overlap:] + chunk
if i < len(chunks) - 1:
chunk = chunk + chunks[i+1][:overlap]
overlapped_chunks.append(chunk)
return overlapped_chunks
def summarize_text(text, context_instructions):
chunks = chunk_text(text, max_chunk_size=3000, overlap=200)
summaries = []
for chunk in chunks:
prompt = f"""You are a financial analyst. Summarize the following text from a financial perspective:
{chunk}
{context_instructions}"""
summary = query_llama({"inputs": prompt, "parameters": {"max_length": 1000}})
if summary and isinstance(summary, list) and 'generated_text' in summary[0]:
summaries.append(summary[0]['generated_text'])
# Combine summaries
combined_summary = "\n\n".join(summaries)
# Final summarization of combined summaries
final_prompt = f"""As a financial analyst, provide a coherent and comprehensive summary of the following financial information:
{combined_summary}
Focus on the most important financial implications and analysis."""
final_summary = query_llama({"inputs": final_prompt, "parameters": {"max_length": 3000}})
if final_summary and isinstance(final_summary, list) and 'generated_text' in final_summary[0]:
return final_summary[0]['generated_text']
else:
return "Unable to generate summary due to an error."
def summarize_financial_news(query, read_pdf=False, pdf=None):
"""Search for financial news, extract relevant content
, and summarize"""
all_filtered_text = ""
if read_pdf and pdf is not None:
pdf_text = extract_text_from_pdf(pdf)
all_filtered_text += pdf_text + "\n\n"
else:
search_results = google_search(query, num_results=1)
for result in search_results:
if result['text']:
filtered_text = filter_relevant_content(result['text'])
all_filtered_text += filtered_text + "\n\n"
if not all_filtered_text:
return "No relevant financial information found."
context_instructions = "Provide a detailed, coherent summary focusing on financial implications and analysis."
return summarize_text(all_filtered_text, context_instructions)
def extract_text_from_pdf(pdf):
"""Extract text from each page of the PDF"""
reader = PyPDF2.PdfFileReader(pdf)
text = ""
for page_num in range(reader.getNumPages()):
page = reader.getPage(page_num)
text += page.extract_text() + "\n"
return text
# Gradio Interface
def interface_function(query, read_pdf, pdf):
return summarize_financial_news(query, read_pdf, pdf)
iface = gr.Interface(
fn=interface_function,
inputs=[
gr.Textbox(lines=2, placeholder="Enter a company name or financial topic..."),
gr.Checkbox(label="Read PDF"),
gr.File(label="Upload PDF", type="file")
],
outputs="text",
title="Financial News Summarizer",
description="Enter a company name or financial topic to get a summary of recent financial news. Optionally, upload a PDF to summarize its content."
)
iface.launch()