|
import requests |
|
from bs4 import BeautifulSoup |
|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
import random |
|
import urllib.parse |
|
from datetime import datetime, timedelta |
|
import re |
|
import os |
|
import PyPDF2 |
|
|
|
_useragent_list = [ |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", |
|
] |
|
API_URL = "https://api-inference.huggingface.co/models/meta-llama/Meta-Llama-3-8B-Instruct" |
|
headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACE_TOKEN')}"} |
|
def query_llama(payload): |
|
"""Send a query to the Llama model via Hugging Face API""" |
|
try: |
|
print(f"Payload: {payload}") |
|
response = requests.post(API_URL, headers=headers, json=payload) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error querying Llama model: {e}") |
|
return None |
|
def google_search(term, num_results=1, lang="en", timeout=30, safe="active", ssl_verify=None, days_back=90): |
|
"""Perform a Google search and return results""" |
|
print(f"Searching for term: {term}") |
|
|
|
end_date = datetime.now() |
|
start_date = end_date - timedelta(days=days_back) |
|
|
|
start_date_str = start_date.strftime("%Y-%m-%d") |
|
end_date_str = end_date.strftime("%Y-%m-%d") |
|
|
|
search_term = f"{term} financial earnings report after:{start_date_str} before:{end_date_str}" |
|
escaped_term = urllib.parse.quote_plus(search_term) |
|
start = 0 |
|
all_results = [] |
|
max_attempts = num_results * 2 |
|
with requests.Session() as session: |
|
attempts = 0 |
|
while len(all_results) < num_results and attempts < max_attempts: |
|
try: |
|
|
|
user_agent = random.choice(_useragent_list) |
|
headers = {'User-Agent': user_agent} |
|
resp = session.get( |
|
url="https://www.google.com/search", |
|
headers=headers, |
|
params={ |
|
"q": search_term, |
|
"num": num_results - len(all_results), |
|
"hl": lang, |
|
"start": start, |
|
"safe": safe, |
|
}, |
|
timeout=timeout, |
|
verify=ssl_verify, |
|
) |
|
resp.raise_for_status() |
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
result_block = soup.find_all("div", attrs={"class": "g"}) |
|
if not result_block: |
|
print("No more results found.") |
|
break |
|
for result in result_block: |
|
if len(all_results) >= num_results: |
|
break |
|
link = result.find("a", href=True) |
|
if link: |
|
link = link["href"] |
|
print(f"Found link: {link}") |
|
try: |
|
webpage = session.get(link, headers=headers, timeout=timeout) |
|
webpage.raise_for_status() |
|
visible_text = extract_text_from_webpage(webpage.text) |
|
all_results.append({"link": link, "text": visible_text}) |
|
except requests.exceptions.HTTPError as e: |
|
if e.response.status_code == 403: |
|
print(f"403 Forbidden error for {link}, skipping...") |
|
else: |
|
print(f"HTTP error {e.response.status_code} for {link}, skipping...") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching or processing {link}: {e}") |
|
else: |
|
print("No link found in result.") |
|
start += len(result_block) |
|
attempts += 1 |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching search results: {e}") |
|
attempts += 1 |
|
print(f"Total results fetched: {len(all_results)}") |
|
return all_results |
|
def extract_text_from_webpage(html_content): |
|
"""Extract visible text from HTML content""" |
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
|
|
for script in soup(["script", "style"]): |
|
script.decompose() |
|
|
|
text = soup.get_text() |
|
|
|
lines = (line.strip() for line in text.splitlines()) |
|
|
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
|
|
text = '\n'.join(chunk for chunk in chunks if chunk) |
|
return text |
|
def filter_relevant_content(text): |
|
"""Filter out irrelevant content""" |
|
|
|
keywords = ['revenue', 'profit', 'earnings', 'financial', 'quarter', 'fiscal', 'growth', 'income', 'loss', 'dividend'] |
|
|
|
sentences = re.split(r'(?<=[.!?])\s+', text) |
|
|
|
relevant_sentences = [sentence for sentence in sentences if any(keyword in sentence.lower() for keyword in keywords)] |
|
|
|
filtered_text = ' '.join(relevant_sentences) |
|
return filtered_text |
|
def chunk_text(text, max_chunk_size=1000, overlap=100): |
|
|
|
section_keywords = ["revenue", "income", "profit", "loss", "expenses", "outlook", "forecast", "quarter", "year"] |
|
|
|
sentences = re.split(r'(?<=[.!?])\s+', text) |
|
chunks = [] |
|
current_chunk = "" |
|
for sentence in sentences: |
|
if len(current_chunk) + len(sentence) > max_chunk_size: |
|
|
|
chunks.append(current_chunk.strip()) |
|
current_chunk = sentence + " " |
|
elif any(keyword in sentence.lower() for keyword in section_keywords): |
|
|
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
current_chunk = sentence + " " |
|
else: |
|
current_chunk += sentence + " " |
|
|
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
|
|
overlapped_chunks = [] |
|
for i, chunk in enumerate(chunks): |
|
if i > 0: |
|
chunk = chunks[i-1][-overlap:] + chunk |
|
if i < len(chunks) - 1: |
|
chunk = chunk + chunks[i+1][:overlap] |
|
overlapped_chunks.append(chunk) |
|
return overlapped_chunks |
|
def summarize_text(text, context_instructions): |
|
chunks = chunk_text(text, max_chunk_size=3000, overlap=200) |
|
summaries = [] |
|
for chunk in chunks: |
|
prompt = f"""You are a financial analyst. Summarize the following text from a financial perspective: |
|
{chunk} |
|
{context_instructions}""" |
|
summary = query_llama({"inputs": prompt, "parameters": {"max_length": 1000}}) |
|
if summary and isinstance(summary, list) and 'generated_text' in summary[0]: |
|
summaries.append(summary[0]['generated_text']) |
|
|
|
combined_summary = "\n\n".join(summaries) |
|
|
|
final_prompt = f"""As a financial analyst, provide a coherent and comprehensive summary of the following financial information: |
|
{combined_summary} |
|
Focus on the most important financial implications and analysis.""" |
|
final_summary = query_llama({"inputs": final_prompt, "parameters": {"max_length": 3000}}) |
|
if final_summary and isinstance(final_summary, list) and 'generated_text' in final_summary[0]: |
|
return final_summary[0]['generated_text'] |
|
else: |
|
return "Unable to generate summary due to an error." |
|
def summarize_financial_news(query, read_pdf=False, pdf=None): |
|
"""Search for financial news, extract relevant content |
|
, and summarize""" |
|
all_filtered_text = "" |
|
if read_pdf and pdf is not None: |
|
pdf_text = extract_text_from_pdf(pdf) |
|
all_filtered_text += pdf_text + "\n\n" |
|
else: |
|
search_results = google_search(query, num_results=1) |
|
for result in search_results: |
|
if result['text']: |
|
filtered_text = filter_relevant_content(result['text']) |
|
all_filtered_text += filtered_text + "\n\n" |
|
if not all_filtered_text: |
|
return "No relevant financial information found." |
|
context_instructions = "Provide a detailed, coherent summary focusing on financial implications and analysis." |
|
return summarize_text(all_filtered_text, context_instructions) |
|
def extract_text_from_pdf(pdf): |
|
"""Extract text from each page of the PDF""" |
|
reader = PyPDF2.PdfFileReader(pdf) |
|
text = "" |
|
for page_num in range(reader.getNumPages()): |
|
page = reader.getPage(page_num) |
|
text += page.extract_text() + "\n" |
|
return text |
|
|
|
def interface_function(query, read_pdf, pdf): |
|
return summarize_financial_news(query, read_pdf, pdf) |
|
iface = gr.Interface( |
|
fn=interface_function, |
|
inputs=[ |
|
gr.Textbox(lines=2, placeholder="Enter a company name or financial topic..."), |
|
gr.Checkbox(label="Read PDF"), |
|
gr.File(label="Upload PDF", type="file") |
|
], |
|
outputs="text", |
|
title="Financial News Summarizer", |
|
description="Enter a company name or financial topic to get a summary of recent financial news. Optionally, upload a PDF to summarize its content." |
|
) |
|
iface.launch() |