File size: 10,401 Bytes
69d3198
 
da48dbc
 
69d3198
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
da48dbc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69d3198
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7e94170
da48dbc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# helper.py
import yfinance as yf
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from typing import List, Tuple, Dict


def download_stock_data(
    tickers: List[str], start_date: str, end_date: str, w: int
) -> pd.DataFrame:
    """
    Download stock data for given tickers between start_date and end_date.

    Args:
    tickers (List[str]): List of stock ticker symbols.
    start_date (str): Start date for data retrieval in 'YYYY-MM-DD' format.
    end_date (str): End date for data retrieval in 'YYYY-MM-DD' format.
    w (int): Size of the interval that is used to download data

    Returns:
    pd.DataFrame: DataFrame with adjusted close prices for the given tickers.
    """
    data = yf.download(tickers, start=start_date, end=end_date, interval=w)
    return data["Adj Close"]


def download_stocks(tickers: List[str]) -> List[pd.DataFrame]:
    """
    Downloads stock data from Yahoo Finance.

    Args:
        tickers: A list of stock tickers.

    Returns:
        A list of Pandas DataFrames, one for each stock.
    """

    # Create a list of DataFrames.
    df_list = []

    # Iterate over the tickers.
    for ticker in tickers:
        # Download the stock data.
        df = yf.download(ticker)

        # Add the DataFrame to the list.
        df_list.append(df.tail(255 * 8))

    return df_list


def create_portfolio_and_calculate_returns(
    stock_data: pd.DataFrame, top_n: int
) -> pd.DataFrame:
    """
    Create a portfolio and calculate returns based on the given window size.

    Args:
    stock_data (pd.DataFrame): DataFrame containing stock data.
    window_size (int): Size of the window to calculate returns.

    Returns:
    pd.DataFrame: DataFrame containing calculated returns and portfolio history.
    """
    # Compute returns
    returns_data = stock_data.pct_change()
    returns_data.dropna(inplace=True)

    portfolio_history = []  # To keep track of portfolio changes over time
    portfolio_returns = []  # To store portfolio returns for each period

    # Loop over the data in window_size-day windows
    window_size = 1
    for start in range(0, len(returns_data) - window_size, window_size):
        end = start + window_size
        current_window = returns_data[start:end]
        top_stocks = (
            current_window.mean()
            .sort_values(ascending=False)
            .head(top_n)
            .index.tolist()
        )
        next_window = returns_data[end : end + window_size][top_stocks].mean(axis=1)

        portfolio_returns.extend(next_window)
        added_length = len(next_window)
        portfolio_history.extend([top_stocks] * added_length)

    new_returns_data = returns_data.copy()
    new_returns_data = new_returns_data.iloc[0:-window_size, :]
    new_returns_data["benchmark"] = new_returns_data.apply(
        lambda x: x[0:5].mean(), axis=1
    )
    new_returns_data["portfolio_returns"] = portfolio_returns
    new_returns_data["portfolio_history"] = portfolio_history
    new_returns_data["rolling_benchmark"] = (
        new_returns_data["benchmark"] + 1
    ).cumprod()
    new_returns_data["rolling_portfolio_returns"] = (
        new_returns_data["portfolio_returns"] + 1
    ).cumprod()

    return new_returns_data


def portfolio_annualised_performance(
    weights: np.ndarray, mean_returns: np.ndarray, cov_matrix: np.ndarray
) -> Tuple[float, float]:
    """
    Given the weights of the assets in the portfolio, their mean returns, and their covariance matrix,
    this function computes and returns the annualized performance of the portfolio in terms of its
    standard deviation (volatility) and expected returns.

    Args:
        weights (np.ndarray): The weights of the assets in the portfolio.
                              Each weight corresponds to the proportion of the investor's total
                              investment in the corresponding asset.

        mean_returns (np.ndarray): The mean (expected) returns of the assets.

        cov_matrix (np.ndarray): The covariance matrix of the asset returns. Each entry at the
                                 intersection of a row and a column represents the covariance
                                 between the returns of the asset corresponding to that row
                                 and the asset corresponding to that column.

    Returns:
        Tuple of portfolio volatility (standard deviation) and portfolio expected return, both annualized.
    """

    # Annualize portfolio returns by summing up the products of the mean returns and weights of each asset and then multiplying by 252
    # (number of trading days in a year)
    returns = np.sum(mean_returns * weights) * 252

    # Compute portfolio volatility (standard deviation) by dot multiplying the weights transpose and the dot product of covariance matrix
    # and weights. Then take the square root to get the standard deviation and multiply by square root of 252 to annualize it.
    std = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights))) * np.sqrt(252)

    return std, returns


def random_portfolios(
    num_portfolios: int,
    num_weights: int,
    mean_returns: np.ndarray,
    cov_matrix: np.ndarray,
    risk_free_rate: float,
) -> Tuple[np.ndarray, List[np.ndarray]]:
    """
    Generate random portfolios and calculate their standard deviation, returns and Sharpe ratio.

    Args:
        num_portfolios (int): The number of random portfolios to generate.

        mean_returns (np.ndarray): The mean (expected) returns of the assets.

        cov_matrix (np.ndarray): The covariance matrix of the asset returns. Each entry at the
                                 intersection of a row and a column represents the covariance
                                 between the returns of the asset corresponding to that row
                                 and the asset corresponding to that column.

        risk_free_rate (float): The risk-free rate of return.

    Returns:
        Tuple of results and weights_record.

        results (np.ndarray): A 3D array with standard deviation, returns and Sharpe ratio of the portfolios.

        weights_record (List[np.ndarray]): A list with the weights of the assets in each portfolio.
    """
    # Initialize results array with zeros
    results = np.zeros((3, num_portfolios))

    # Initialize weights record list
    weights_record = []

    # Loop over the range of num_portfolios
    for i in np.arange(num_portfolios):
        # Generate random weights
        weights = np.random.random(num_weights)

        # Normalize weights
        weights /= np.sum(weights)

        # Record weights
        weights_record.append(weights)

        # Calculate portfolio standard deviation and returns
        portfolio_std_dev, portfolio_return = portfolio_annualised_performance(
            weights, mean_returns, cov_matrix
        )

        # Store standard deviation, returns and Sharpe ratio in results
        results[0, i] = portfolio_std_dev
        results[1, i] = portfolio_return
        results[2, i] = (portfolio_return - risk_free_rate) / portfolio_std_dev

    return results, weights_record


def display_simulated_ef_with_random(
    table: pd.DataFrame,
    mean_returns: List[float],
    cov_matrix: np.ndarray,
    num_portfolios: int,
    risk_free_rate: float,
) -> plt.Figure:
    """
    This function displays a simulated efficient frontier plot based on randomly generated portfolios with the specified parameters.

    Args:
    - mean_returns (List): A list of mean returns for each security or asset in the portfolio.
    - cov_matrix (ndarray): A covariance matrix for the securities or assets in the portfolio.
    - num_portfolios (int): The number of random portfolios to generate.
    - risk_free_rate (float): The risk-free rate of return.

    Returns:
    - fig (plt.Figure): A pyplot figure object
    """

    # Generate random portfolios using the specified parameters
    results, weights = random_portfolios(
        num_portfolios, len(mean_returns), mean_returns, cov_matrix, risk_free_rate
    )

    # Find the maximum Sharpe ratio portfolio and the portfolio with minimum volatility
    max_sharpe_idx = np.argmax(results[2])
    sdp, rp = results[0, max_sharpe_idx], results[1, max_sharpe_idx]

    # Create a DataFrame of the maximum Sharpe ratio allocation
    max_sharpe_allocation = pd.DataFrame(
        weights[max_sharpe_idx], index=table.columns, columns=["allocation"]
    )
    max_sharpe_allocation.allocation = [
        round(i * 100, 2) for i in max_sharpe_allocation.allocation
    ]
    max_sharpe_allocation = max_sharpe_allocation.T

    # Find index of the portfolio with minimum volatility
    min_vol_idx = np.argmin(results[0])
    sdp_min, rp_min = results[0, min_vol_idx], results[1, min_vol_idx]

    # Create a DataFrame of the minimum volatility allocation
    min_vol_allocation = pd.DataFrame(
        weights[min_vol_idx], index=table.columns, columns=["allocation"]
    )
    min_vol_allocation.allocation = [
        round(i * 100, 2) for i in min_vol_allocation.allocation
    ]
    min_vol_allocation = min_vol_allocation.T

    # Generate and plot the efficient frontier
    fig, ax = plt.subplots(figsize=(10, 7))
    ax.scatter(
        results[0, :],
        results[1, :],
        c=results[2, :],
        cmap="YlGnBu",
        marker="o",
        s=10,
        alpha=0.3,
    )
    ax.scatter(sdp, rp, marker="*", color="r", s=500, label="Maximum Sharpe ratio")
    ax.scatter(
        sdp_min, rp_min, marker="*", color="g", s=500, label="Minimum volatility"
    )
    ax.set_title("Simulated Portfolio Optimization based on Efficient Frontier")
    ax.set_xlabel("Annual volatility")
    ax.set_ylabel("Annual returns")
    ax.legend(labelspacing=0.8)

    return fig, {
        "Annualised Return": round(rp, 2),
        "Annualised Volatility": round(sdp, 2),
        "Max Sharpe Allocation": max_sharpe_allocation,
        "Max Sharpe Allocation in Percentile": max_sharpe_allocation.div(
            max_sharpe_allocation.sum(axis=1), axis=0
        ),
        "Annualised Return": round(rp_min, 2),
        "Annualised Volatility": round(sdp_min, 2),
        "Min Volatility Allocation": min_vol_allocation,
        "Min Volatility Allocation in Percentile": min_vol_allocation.div(
            min_vol_allocation.sum(axis=1), axis=0
        ),
    }