Spaces:
Runtime error
Runtime error
from flask import Flask, render_template | |
from flask_socketio import SocketIO | |
from transformers import pipeline | |
import yfinance as yf | |
import requests | |
from newspaper import Article | |
from newspaper import Config | |
from bs4 import BeautifulSoup | |
from datetime import datetime, timedelta | |
import plotly.graph_objs as go | |
from transformers import BertTokenizer, BertForSequenceClassification | |
from transformers import BartForConditionalGeneration, BartTokenizer | |
import threading | |
app = Flask(__name__) | |
socketio = SocketIO(app) | |
news_data = [] | |
# Set up sentiment analysis and summarization pipelines | |
finbert = BertForSequenceClassification.from_pretrained('yiyanghkust/finbert-tone', num_labels=3) | |
sentiment_tokenizer = BertTokenizer.from_pretrained('yiyanghkust/finbert-tone') | |
summarization_model = BartForConditionalGeneration.from_pretrained("facebook/bart-large", forced_bos_token_id=0) | |
summarization_tokenizer = BartTokenizer.from_pretrained("facebook/bart-large") | |
sentiment_analysis_pipeline = pipeline("sentiment-analysis", model=finbert, tokenizer=sentiment_tokenizer, truncation=True, max_length=512) | |
summarization_pipeline = pipeline("summarization", model=summarization_model, tokenizer=summarization_tokenizer, max_length=512, truncation=True) | |
# News API setup | |
def get_news_articles_info(ticker_name): | |
# URL of the search results page | |
url = "https://www.marketwatch.com/search?q=" + ticker_name + "&ts=0&tab=All%20News" | |
# Send an HTTP GET request to the URL | |
response = requests.get(url) | |
# Parse the HTML content using BeautifulSoup | |
soup = BeautifulSoup(response.content, "html.parser") | |
article_links = [] | |
for content in soup.find_all("h3",class_="article__headline"): | |
for link in content.find_all("a"): | |
if link['href'] != "#": | |
article_links.append(link['href']) | |
article_links = article_links[18:36] | |
ticker_news_extracted = [] | |
count = 0 | |
for link in article_links: | |
article_info = {} | |
article_info['text'] = '' | |
article_info['url'] = link | |
try: | |
url = article_info['url'] | |
url = requests.head(url, allow_redirects=True).url | |
user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36' | |
config = Config() | |
config.browser_user_agent = user_agent | |
article = Article(url, config=config) | |
article.download() | |
article.parse() | |
article_info['title'] = article.title | |
article_info['text'] = article.text | |
article_info['url'] = url | |
count = count + 1 | |
print(count,url) | |
socketio.emit('log',str(count) + " " + url) | |
except Exception as error: | |
print("Error",url,error) | |
continue | |
if article_info['text'] == '': | |
print('No text',url) | |
continue | |
if count > 5: | |
break | |
ticker_news_extracted.append(article_info) | |
return ticker_news_extracted | |
def index(): | |
global news_data | |
news_data = [] | |
socketio.emit('news_data',news_data) | |
return render_template("index_socket.html") | |
def get_graph(ticker): | |
# Define the stock ticker symbol and the date range | |
ticker_symbol = ticker # Example: Apple Inc. | |
end_date = datetime.today() | |
start_date = end_date - timedelta(days=90) | |
# Fetch historical data using yfinance | |
data = yf.download(ticker_symbol, start=start_date, end=end_date) | |
# Create a candlestick graph using Plotly | |
fig = go.Figure(data=[go.Candlestick(x=data.index.strftime('%Y-%m-%d').tolist(), | |
open=data['Open'].tolist(), | |
high=data['High'].tolist(), | |
low=data['Low'].tolist(), | |
close=data['Close'].tolist())]) | |
# Customize the layout | |
fig.update_layout(title=f'Candlestick Chart for {ticker_symbol} in the Last 90 Days', | |
xaxis_title='Date', | |
yaxis_title='Price', | |
xaxis_rangeslider_visible=False) | |
# convert the fig to HTML DIV element | |
graph_data = fig.to_plotly_json() | |
socketio.emit('graph',graph_data) | |
print("Graph is finished") | |
def get_sentiment(_index,_content): | |
_sentiment = sentiment_analysis_pipeline(_content)[0] | |
news_data[_index]['sentiment'] = _sentiment["label"] | |
news_data[_index]['sentiment_score'] = round(_sentiment["score"], 3) | |
socketio.emit('news_data',news_data) | |
def get_summary(_index,_content): | |
_summary = summarization_pipeline(_content, max_length=100, min_length=30, do_sample=False)[0]["summary_text"] | |
news_data[_index]['summary'] = _summary | |
socketio.emit('news_data', news_data) | |
def get_news_data(ticker): | |
global news_data | |
news_data = [] | |
socketio.emit('news_data',news_data) | |
# Get top 5 news articles for the given ticker | |
search_results = get_news_articles_info(ticker_name=ticker) | |
articles = search_results[:5] | |
for article in articles: | |
news_data.append({ | |
"title": article['title'], | |
"link": article['url'], | |
"sentiment": 'Loading...', | |
"sentiment_score": 'Loading...', | |
"summary": 'Loading...', | |
"Ticker": ticker | |
}) | |
socketio.emit('news_data',news_data) | |
threads_list = [] | |
for i in range(0,len(articles)): | |
article = articles[i] | |
link = article['url'] | |
content = article['text'] | |
t1 = threading.Thread(target=get_sentiment, args=(i,content)) | |
t2 = threading.Thread(target=get_summary, args=(i,content)) | |
t1.start() | |
t2.start() | |
threads_list.append(t1) | |
threads_list.append(t2) | |
socketio.emit('log',link + " Analysis started") | |
for thread in threads_list: | |
thread.join() | |
socketio.emit('log','Analysis is finished') | |
if __name__ == "__main__": | |
socketio.run(app, host='0.0.0.0', port=7860) |