# helper.py import os import time from typing import Dict, List, Tuple import matplotlib.pyplot as plt import numpy as np import pandas as pd import requests import stripe import streamlit as st import yfinance as yf from langchain import LLMChain, PromptTemplate from langchain.agents import initialize_agent, load_tools from langchain.llms import OpenAI from openai import OpenAI def download_stock_data( tickers: List[str], start_date: str, end_date: str, w: int ) -> pd.DataFrame: """ Download stock data for given tickers between start_date and end_date. Args: tickers (List[str]): List of stock ticker symbols. start_date (str): Start date for data retrieval in 'YYYY-MM-DD' format. end_date (str): End date for data retrieval in 'YYYY-MM-DD' format. w (int): Size of the interval that is used to download data Returns: pd.DataFrame: DataFrame with adjusted close prices for the given tickers. """ data = yf.download(tickers, start=start_date, end=end_date, interval=w) return data["Adj Close"] def download_stocks(tickers: List[str]) -> List[pd.DataFrame]: """ Downloads stock data from Yahoo Finance. Args: tickers: A list of stock tickers. Returns: A list of Pandas DataFrames, one for each stock. """ # Create a list of DataFrames. df_list = [] # Iterate over the tickers. for ticker in tickers: # Download the stock data. df = yf.download(ticker) # Add the DataFrame to the list. df_list.append(df.tail(255 * 8)) return df_list def create_portfolio_and_calculate_returns( stock_data: pd.DataFrame, top_n: int ) -> pd.DataFrame: """ Create a portfolio and calculate returns based on the given window size. Args: stock_data (pd.DataFrame): DataFrame containing stock data. window_size (int): Size of the window to calculate returns. Returns: pd.DataFrame: DataFrame containing calculated returns and portfolio history. """ # Compute returns returns_data = stock_data.pct_change() returns_data.dropna(inplace=True) portfolio_history = [] # To keep track of portfolio changes over time portfolio_returns = [] # To store portfolio returns for each period # Loop over the data in window_size-day windows window_size = 1 for start in range(0, len(returns_data) - window_size, window_size): end = start + window_size current_window = returns_data[start:end] top_stocks = ( current_window.mean() .sort_values(ascending=False) .head(top_n) .index.tolist() ) next_window = returns_data[end : end + window_size][top_stocks].mean(axis=1) portfolio_returns.extend(next_window) added_length = len(next_window) portfolio_history.extend([top_stocks] * added_length) new_returns_data = returns_data.copy() new_returns_data = new_returns_data.iloc[0:-window_size, :] new_returns_data["benchmark"] = new_returns_data.apply( lambda x: x[0:5].mean(), axis=1 ) new_returns_data["portfolio_returns"] = portfolio_returns new_returns_data["portfolio_history"] = portfolio_history new_returns_data["rolling_benchmark"] = ( new_returns_data["benchmark"] + 1 ).cumprod() new_returns_data["rolling_portfolio_returns"] = ( new_returns_data["portfolio_returns"] + 1 ).cumprod() return new_returns_data def portfolio_annualised_performance( weights: np.ndarray, mean_returns: np.ndarray, cov_matrix: np.ndarray ) -> Tuple[float, float]: """ Given the weights of the assets in the portfolio, their mean returns, and their covariance matrix, this function computes and returns the annualized performance of the portfolio in terms of its standard deviation (volatility) and expected returns. Args: weights (np.ndarray): The weights of the assets in the portfolio. Each weight corresponds to the proportion of the investor's total investment in the corresponding asset. mean_returns (np.ndarray): The mean (expected) returns of the assets. cov_matrix (np.ndarray): The covariance matrix of the asset returns. Each entry at the intersection of a row and a column represents the covariance between the returns of the asset corresponding to that row and the asset corresponding to that column. Returns: Tuple of portfolio volatility (standard deviation) and portfolio expected return, both annualized. """ # Annualize portfolio returns by summing up the products of the mean returns and weights of each asset and then multiplying by 252 # (number of trading days in a year) returns = np.sum(mean_returns * weights) * 252 # Compute portfolio volatility (standard deviation) by dot multiplying the weights transpose and the dot product of covariance matrix # and weights. Then take the square root to get the standard deviation and multiply by square root of 252 to annualize it. std = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights))) * np.sqrt(252) return std, returns def random_portfolios( num_portfolios: int, num_weights: int, mean_returns: np.ndarray, cov_matrix: np.ndarray, risk_free_rate: float, ) -> Tuple[np.ndarray, List[np.ndarray]]: """ Generate random portfolios and calculate their standard deviation, returns and Sharpe ratio. Args: num_portfolios (int): The number of random portfolios to generate. mean_returns (np.ndarray): The mean (expected) returns of the assets. cov_matrix (np.ndarray): The covariance matrix of the asset returns. Each entry at the intersection of a row and a column represents the covariance between the returns of the asset corresponding to that row and the asset corresponding to that column. risk_free_rate (float): The risk-free rate of return. Returns: Tuple of results and weights_record. results (np.ndarray): A 3D array with standard deviation, returns and Sharpe ratio of the portfolios. weights_record (List[np.ndarray]): A list with the weights of the assets in each portfolio. """ # Initialize results array with zeros results = np.zeros((3, num_portfolios)) # Initialize weights record list weights_record = [] # Loop over the range of num_portfolios for i in np.arange(num_portfolios): # Generate random weights weights = np.random.random(num_weights) # Normalize weights weights /= np.sum(weights) # Record weights weights_record.append(weights) # Calculate portfolio standard deviation and returns portfolio_std_dev, portfolio_return = portfolio_annualised_performance( weights, mean_returns, cov_matrix ) # Store standard deviation, returns and Sharpe ratio in results results[0, i] = portfolio_std_dev results[1, i] = portfolio_return results[2, i] = (portfolio_return - risk_free_rate) / portfolio_std_dev return results, weights_record def display_simulated_ef_with_random( table: pd.DataFrame, mean_returns: List[float], cov_matrix: np.ndarray, num_portfolios: int, risk_free_rate: float, ) -> plt.Figure: """ This function displays a simulated efficient frontier plot based on randomly generated portfolios with the specified parameters. Args: - mean_returns (List): A list of mean returns for each security or asset in the portfolio. - cov_matrix (ndarray): A covariance matrix for the securities or assets in the portfolio. - num_portfolios (int): The number of random portfolios to generate. - risk_free_rate (float): The risk-free rate of return. Returns: - fig (plt.Figure): A pyplot figure object """ # Generate random portfolios using the specified parameters results, weights = random_portfolios( num_portfolios, len(mean_returns), mean_returns, cov_matrix, risk_free_rate ) # Find the maximum Sharpe ratio portfolio and the portfolio with minimum volatility max_sharpe_idx = np.argmax(results[2]) sdp, rp = results[0, max_sharpe_idx], results[1, max_sharpe_idx] # Create a DataFrame of the maximum Sharpe ratio allocation max_sharpe_allocation = pd.DataFrame( weights[max_sharpe_idx], index=table.columns, columns=["allocation"] ) max_sharpe_allocation.allocation = [ round(i * 100, 2) for i in max_sharpe_allocation.allocation ] max_sharpe_allocation = max_sharpe_allocation.T # Find index of the portfolio with minimum volatility min_vol_idx = np.argmin(results[0]) sdp_min, rp_min = results[0, min_vol_idx], results[1, min_vol_idx] # Create a DataFrame of the minimum volatility allocation min_vol_allocation = pd.DataFrame( weights[min_vol_idx], index=table.columns, columns=["allocation"] ) min_vol_allocation.allocation = [ round(i * 100, 2) for i in min_vol_allocation.allocation ] min_vol_allocation = min_vol_allocation.T # Generate and plot the efficient frontier fig, ax = plt.subplots(figsize=(10, 7)) ax.scatter( results[0, :], results[1, :], c=results[2, :], cmap="YlGnBu", marker="o", s=10, alpha=0.3, ) ax.scatter(sdp, rp, marker="*", color="r", s=500, label="Maximum Sharpe ratio") ax.scatter( sdp_min, rp_min, marker="*", color="g", s=500, label="Minimum volatility" ) ax.set_title("Simulated Portfolio Optimization based on Efficient Frontier") ax.set_xlabel("Annual volatility") ax.set_ylabel("Annual returns") ax.legend(labelspacing=0.8) return fig, { "Annualised Return": round(rp, 2), "Annualised Volatility": round(sdp, 2), "Max Sharpe Allocation": max_sharpe_allocation, "Max Sharpe Allocation in Percentile": max_sharpe_allocation.div( max_sharpe_allocation.sum(axis=1), axis=0 ), "Annualised Return": round(rp_min, 2), "Annualised Volatility": round(sdp_min, 2), "Min Volatility Allocation": min_vol_allocation, "Min Volatility Allocation in Percentile": min_vol_allocation.div( min_vol_allocation.sum(axis=1), axis=0 ), } OPENAI_API_KEY = os.environ["OPENAI_API_KEY"] def run_langchain_agent_(question: str = "What is your question?") -> str: """ Executes a language chain agent to answer questions by using a series of tools. This function creates an instance of an OpenAI model, sets up a prompt template, loads necessary tools, initializes the agent, and runs the agent with the provided question. It returns the agent's output. Parameters: question (str): The question to be answered by the agent. Defaults to "What is your question?". Returns: str: The output response from the agent after processing the question. """ # Instantiating an OpenAI language model with specific temperature setting llm = OpenAI( temperature=0.1 ) # model_name="text-davinci-003" implied but commented out # Creating a prompt template that structures the input question and a step-by-step thinking format template = """Question: {question}; You are a financial advisor and user has a question above regarding related tickers provided. Let's think step by step. Answer: """ prompt = PromptTemplate(template=template, input_variables=["question"]) # Building a chain of language model actions based on the prompt template llm_chain = LLMChain(prompt=prompt, llm=llm) # Loading additional language model tools like Wikipedia and math modules tools = load_tools(["wikipedia", "llm-math"], llm=llm) # Initializing the agent with the loaded tools, the language model, default name, and verbosity agent = initialize_agent( tools, llm, agent="zero-shot-react-description", verbose=True, max_iterations=5, ) # Running the agent to process the input question and generate an output output_ = agent.run(question) # Return return output_ openai_client = OpenAI(api_key=os.environ["OPENAI_API_KEY"]) def call_gpt(prompt: str, content: str) -> str: """ Sends a structured conversation context including a system prompt, user prompt, and additional background content to the GPT-3.5-turbo model for a response. This function is responsible for generating an AI-powered response by interacting with the OpenAI API. It puts together a preset system message, a formatted user query, and additional background information before requesting the completion from the model. Args: prompt (str): The main question or topic that the user wants to address. content (str): Additional background information or details relevant to the prompt. Returns: str: The generated response from the GPT model based on the given prompts and content. Note: 'openai_client' is assumed to be an already created and authenticated instance of the OpenAI openai_client, which should be set up prior to calling this function. """ # Generates a response from the model based on the interactive messages provided response = openai_client.chat.completions.create( model="gpt-3.5-turbo", # The AI model being queried for a response messages=[ # System message defining the assistant's role {"role": "system", "content": "You are a helpful assistant."}, # User message containing the prompt {"role": "user", "content": f"I want to ask you a question: {prompt}"}, # Assistant message asking for background content {"role": "assistant", "content": "What is the background content?"}, # User providing the background content {"role": "user", "content": content}, ], ) # Extracts and returns the response content from the model's completion return response.choices[0].message.content # Set your secret key. Remember to switch to your live secret key in production! stripe.api_key = os.environ["STRIPE_API_KEY"] # Set the product id. stripe_price_id = os.environ["STRIPE_PRICE_ID"] # Function to create a Stripe Checkout Session def create_checkout_session(): try: session = stripe.checkout.Session.create( payment_method_types=["card"], line_items=[ { "price": stripe_price_id, # Replace with your actual Stripe price ID "quantity": 1, } ], mode="payment", success_url="https://39701j614g.execute-api.us-east-1.amazonaws.com/dev/?sessionID={CHECKOUT_SESSION_ID}&key=123", cancel_url="https://huggingface.co/spaces/eagle0504/Momentum-Strategy-Screener", ) return session.id, session.url except Exception as e: st.error(f"Error creating checkout session: {e}") return None, None # Function to check payment status def check_payment_status(session_id): try: session = stripe.checkout.Session.retrieve(session_id) return session.payment_status == "paid" except Exception as e: st.error(f"Error checking payment status: {e}") return False