Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
from NSEDownload import stocks | |
from time import sleep | |
import os | |
from gradio_rangeslider import RangeSlider | |
from gradio_calendar import Calendar | |
import datetime | |
def is_weekday(date: datetime.datetime): | |
return date.weekday() < 5 | |
## Get 10 year data from NSE | |
def get_data(symb,year1,year2): | |
symb = symb.upper() | |
if os.path.exists(f'{symb}.csv'): | |
return | |
l = [] | |
year1 = year1.strftime("%d-%m-%Y") | |
year2 = year2.strftime("%d-%m-%Y") | |
# year1 = | |
# year2 = | |
df = stocks.get_data(stock_symbol=symb, start_date=year1, end_date=year2) | |
df.reset_index(drop=False,inplace=True) | |
# for i in range(0,10,5): | |
# try: | |
# year = i + year | |
# df = stocks.get_data(stock_symbol=symb, start_date=year1, end_date=year2) | |
# df.reset_index(drop=False,inplace=True) | |
# l.append(df) | |
# sleep(15) | |
# except: | |
# pass | |
# dff = pd.concat(l,ignore_index=True) | |
df.to_csv(f'{symb}.csv',index=False,encoding='utf-8') | |
return | |
# Calculate profit/loss based on stock price movement after condition is met | |
def calculate_profit_loss(stock_data,days_to_monitor): | |
buy_sell_actions = [] | |
for i in range(len(stock_data)): | |
if stock_data['condition'].iloc[i] == 1: # Trigger condition met | |
buy_price = stock_data['Open Price'].iloc[i+1] # Buy on the next day's open price | |
monitored_prices = stock_data.iloc[i+1:i+days_to_monitor] # Monitor the next 8 days | |
sell_price = None | |
for j in range(len(monitored_prices)): | |
no_trigger = 0 | |
open_price = monitored_prices['Open Price'].iloc[j] | |
close_price = monitored_prices['Close Price'].iloc[j] | |
change_percent = (close_price - buy_price) / buy_price * 100 | |
# Check for the +2%, +3%, +5%, +8% thresholds and set stop loss | |
if change_percent >= 8: | |
sell_price = close_price | |
break | |
elif change_percent >= 5: | |
sell_price = max(sell_price or 0, close_price) | |
if change_percent <= 5: | |
break | |
elif change_percent >= 3: | |
sell_price = max(sell_price or 0, close_price) | |
if change_percent <= 3: | |
break | |
elif change_percent >= 2: | |
sell_price = max(sell_price or 0, close_price) | |
if change_percent <= 2: | |
break | |
# Stop-loss at -3% | |
elif change_percent <= -3: | |
sell_price = close_price | |
break | |
# If no triggers happen, sell at the 8th day's closing price | |
if sell_price is None: | |
sell_price = monitored_prices['Close Price'].iloc[-1] | |
no_trigger = 1 | |
# Calculate profit/loss percentage | |
profit_loss_percent = (sell_price - buy_price) / buy_price * 100 | |
buy_sell_actions.append({ | |
'Buy Date': stock_data['Date'].iloc[i+1], | |
'Sell Date': monitored_prices['Date'].iloc[j] if sell_price != monitored_prices['Close Price'].iloc[-1] else monitored_prices['Date'].iloc[-1], | |
'Buy Price': buy_price, | |
'Sell Price': sell_price, | |
'Profit/Loss (%)': profit_loss_percent, | |
'No trigger': no_trigger | |
}) | |
dft = pd.DataFrame(buy_sell_actions) | |
# print(dft.head()) | |
dff = pd.DataFrame(columns = ['+ve trade probability','Median returns','Mean returns','Best return','Worst return']) | |
dff['+ve trade probability'] = [round(len(dft[dft['Profit/Loss (%)'] > 0]) / len(dft),3)] | |
dff['Mean returns'] = [round(dft['Profit/Loss (%)'].mean(),3)] | |
dff['Median returns'] = [round(dft['Profit/Loss (%)'].median(),3)] | |
dff['Best return'] = [round(dft['Profit/Loss (%)'].max(),3)] | |
dff['Worst return'] = [round(dft['Profit/Loss (%)'].min(),3)] | |
# print(dff.head()) | |
return dft, dff | |
# Example function to simulate loading and processing stock data | |
def get_stock_data(stock_name,date1,date2,rsi_window,days_to_monitor,previous_n_days,rsi_threshold1,rsi_threshold2): | |
stock_name = stock_name.upper() | |
get_data(stock_name,date1,date2) | |
stock_data = pd.read_csv(f'{stock_name}.csv') | |
# Ensure the 'Date' column is in datetime format | |
stock_data['Date'] = pd.to_datetime(stock_data['Date']) | |
# Sort data by date | |
stock_data = stock_data.sort_values(by='Date') | |
# Calculate daily RSI | |
def calculate_rsi(data, window): | |
delta = data['Close Price'].diff(1) | |
# print(delta) | |
gain = np.where(delta > 0, delta, 0) | |
loss = np.where(delta < 0, -delta, 0) | |
avg_gain = pd.Series(gain).rolling(window=window).mean() | |
avg_loss = pd.Series(loss).rolling(window=window).mean() | |
rs = avg_gain / avg_loss | |
rsi = 100 - (100 / (1 + rs)) | |
return rsi | |
# Add a new column 'Daily RSI' for 14-day RSI | |
stock_data['Daily RSI'] = calculate_rsi(stock_data, window=rsi_window) | |
# Function to calculate sliding weekly RSI | |
def calculate_sliding_weekly_rsi(data): | |
global weekly_rsi | |
weekly_rsi = [] | |
for i in range(7): | |
stock_data1 = stock_data.iloc[i::7].reset_index(drop=True) | |
stock_data1['weekly RSI'] = calculate_rsi(stock_data1, window=rsi_window) | |
weekly_rsi.append(stock_data1) | |
stock_data2 = pd.concat(weekly_rsi,ignore_index=True) | |
stock_data.drop_duplicates(subset=['Date'],keep='first',inplace=True) | |
stock_data2 = stock_data2.sort_values(by='Date') | |
return stock_data2 | |
# Calculate sliding weekly RSI | |
stock_data = calculate_sliding_weekly_rsi(stock_data) | |
stock_data.reset_index(drop=True,inplace=True) | |
## Applying the condition | |
for i in range(previous_n_days, len(stock_data)): | |
prev_n_days_rsi = stock_data['Daily RSI'][i-previous_n_days:i] | |
if all(prev_n_days_rsi < rsi_threshold1) and rsi_threshold2[0] <= stock_data['Daily RSI'].iloc[i] <= rsi_threshold2[1]: | |
stock_data.at[i, 'condition'] = 1 | |
fstock = stock_data[stock_data['condition']==1].reset_index(drop=True) | |
profit_data, summary_data = calculate_profit_loss(stock_data,days_to_monitor) | |
# Returning two dataframes: One for the full stock data, another for RSI values | |
return fstock, profit_data,summary_data | |
# Function to save CSV file and return its path | |
def save_to_csv(stock_input,date1,date2,rsi_window,days_to_monitor,previous_n_days,rsi_threshold1,rsi_threshold2): | |
stock_name = stock_input.upper() | |
fstock, profit_data,summary_data = get_stock_data(stock_name,date1,date2,rsi_window,days_to_monitor,previous_n_days,rsi_threshold1,rsi_threshold2) | |
csv_file_path = f'{stock_name}.csv' | |
# fstock['Date'] = pd.to_datetime(fstock['Date']).dt.date | |
# profit_data['Date'] = pd.to_datetime(profit_data['Date']).dt.date | |
return fstock, profit_data, summary_data, csv_file_path | |
# Create the Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
""" | |
<h1 style="text-align: center; color: #4CAF50;">Stock Analysis Interface</h1> | |
<p style="text-align: center;">Enter a stock Symbol and Calculate the algo returns.</p> | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
stock_input = gr.Textbox(label="Enter Stock Symbol", placeholder="e.g., CANBK", lines=1) | |
date1 = Calendar(type="datetime", label="Select starting date", info="Click the calendar icon to bring up the calendar.") | |
date2 = Calendar(type="datetime", label="Select ending date", info="Click the calendar icon to bring up the calendar.") | |
rsi_window_slider = gr.Slider(minimum=1, maximum=30, value=14, label="RSI Window (Days)", step=1) | |
days_to_monitor = gr.Slider(minimum=1, maximum=30, value=8, label="Days to monitor stock once condition is met", step=1) | |
rsi_threshold1 = gr.Slider(minimum=1, maximum=100, value=65, label="Previous RSI threshold", step=1) | |
previous_n_days = gr.Slider(minimum=1, maximum=180, value=30, label="N -> days to check for RSI threshold", step=1) | |
rsi_threshold2 = RangeSlider(label="Current RSI Range", minimum=0, maximum=100, value=[65, 70]) | |
text1 = "### Wait for few minutes after Submit for the report to generate." | |
range_ = gr.Markdown(value=text1) | |
rsi_threshold2.change(lambda s: text1, rsi_threshold2, range_, | |
show_progress="hide", trigger_mode="always_last") | |
submit_button = gr.Button("Submit", variant="primary") | |
with gr.Column(): | |
gr.Markdown("<h3 style='text-align: center;'>Output</h3>") | |
output_stock_data = gr.DataFrame(label="Dates where Conditions met", interactive=False) | |
output_pl_data = gr.DataFrame(label="Profit and Loss Statement", interactive=False) | |
output_summary_data = gr.DataFrame(label="Returns Summary", interactive=False) | |
csv_download = gr.File(label="Download the full CSV") | |
# When the button is clicked, show the two dataframes and provide a downloadable CSV | |
submit_button.click(save_to_csv, inputs=[stock_input,date1,date2,rsi_window_slider,days_to_monitor,previous_n_days,rsi_threshold1,rsi_threshold2], outputs=[output_stock_data,output_pl_data, output_summary_data,csv_download]) | |
# Launch the Gradio interface | |
demo.launch() | |