Prospea / app.py
RyumaShisui's picture
Update app.py
d19bd0b verified
raw
history blame
13.2 kB
import streamlit as st
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Any
# --- Data Processing Class ---
class DataProcessor:
def __init__(self):
self.data = None
self.numeric_columns = []
self.categorical_columns = []
self.date_columns = []
def load_data(self, file) -> bool:
try:
self.data = pd.read_csv(file)
self._classify_columns()
return True
except Exception as e:
st.error(f"Error loading data: {str(e)}")
return False
def _classify_columns(self):
for col in self.data.columns:
if pd.api.types.is_numeric_dtype(self.data[col]):
self.numeric_columns.append(col)
elif pd.api.types.is_datetime64_any_dtype(self.data[col]):
self.date_columns.append(col)
else:
try:
pd.to_datetime(self.data[col])
self.date_columns.append(col)
except:
self.categorical_columns.append(col)
def get_basic_stats(self) -> Dict[str, Any]:
if self.data is None:
return {}
stats = {
'summary': self.data[self.numeric_columns].describe(),
'missing_values': self.data.isnull().sum(),
'row_count': len(self.data),
'column_count': len(self.data.columns)
}
return stats
def create_visualization(self, chart_type: str, x_col: str, y_col: str, color_col: str = None) -> go.Figure:
if chart_type == "Line Plot":
fig = px.line(self.data, x=x_col, y=y_col, color=color_col)
elif chart_type == "Bar Plot":
fig = px.bar(self.data, x=x_col, y=y_col, color=color_col)
elif chart_type == "Scatter Plot":
fig = px.scatter(self.data, x=x_col, y=y_col, color=color_col)
elif chart_type == "Box Plot":
fig = px.box(self.data, x=x_col, y=y_col, color=color_col)
else:
fig = px.histogram(self.data, x=x_col, color=color_col)
return fig
class BrainstormManager:
def __init__(self):
if 'products' not in st.session_state:
st.session_state.products = {}
def generate_product_form(self) -> Dict:
with st.form("product_form"):
basic_info = {
"name": st.text_input("Product Name"),
"category": st.selectbox("Category", ["Digital", "Physical", "Service"]),
"description": st.text_area("Description"),
"target_audience": st.multiselect("Target Audience",
["Students", "Professionals", "Businesses", "Seniors", "Youth"]),
"price_range": st.slider("Price Range ($)", 0, 1000, (50, 200)),
"launch_date": st.date_input("Expected Launch Date")
}
st.subheader("Market Analysis")
market_analysis = {
"competitors": st.text_area("Main Competitors (one per line)"),
"unique_features": st.text_area("Unique Selling Points"),
"market_size": st.selectbox("Market Size",
["Small", "Medium", "Large", "Enterprise"]),
"growth_potential": st.slider("Growth Potential", 1, 10)
}
submitted = st.form_submit_button("Save Product")
return basic_info, market_analysis, submitted
def analyze_product(self, product_data: Dict) -> Dict:
insights = {
"market_opportunity": self._calculate_opportunity_score(product_data),
"suggested_price": self._suggest_price(product_data),
"risk_factors": self._identify_risks(product_data),
"next_steps": self._generate_next_steps(product_data)
}
return insights
def _calculate_opportunity_score(self, data: Dict) -> int:
score = 0
if data.get("market_size") == "Large":
score += 3
if len(data.get("target_audience", [])) >= 2:
score += 2
if data.get("growth_potential", 0) > 7:
score += 2
return min(score, 10)
def _suggest_price(self, data: Dict) -> float:
base_price = sum(data.get("price_range", (0, 0))) / 2
if data.get("market_size") == "Enterprise":
base_price *= 1.5
return round(base_price, 2)
def _identify_risks(self, data: Dict) -> List[str]:
risks = []
if data.get("competitors"):
risks.append("Competitive market - differentiation crucial")
if len(data.get("target_audience", [])) < 2:
risks.append("Narrow target audience - consider expansion")
return risks
def _generate_next_steps(self, data: Dict) -> List[str]:
steps = [
"Create detailed product specification",
"Develop MVP timeline",
"Plan marketing strategy"
]
if data.get("market_size") == "Enterprise":
steps.append("Prepare enterprise sales strategy")
return steps
# --- Sample Data Generation ---
def generate_sample_data():
dates = pd.date_range(start='2024-01-01', end='2024-01-31', freq='D')
return pd.DataFrame({
'Date': dates,
'Revenue': np.random.normal(1000, 100, len(dates)),
'Users': np.random.randint(100, 200, len(dates)),
'Engagement': np.random.uniform(0.5, 0.9, len(dates)),
'Category': np.random.choice(['A', 'B', 'C'], len(dates))
})
# --- Page Rendering Functions ---
def render_dashboard():
st.header("πŸ“Š Performance Dashboard")
data = generate_sample_data()
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Total Revenue", f"${data['Revenue'].sum():,.2f}")
with col2:
st.metric("Total Users", f"{data['Users'].sum():,}")
with col3:
st.metric("Avg Engagement", f"{data['Engagement'].mean():.2%}")
with col4:
st.metric("Active Days", len(data))
col1, col2 = st.columns(2)
with col1:
st.subheader("Revenue Trend")
fig = px.line(data, x='Date', y='Revenue')
st.plotly_chart(fig, use_container_width=True)
with col2:
st.subheader("User Engagement by Category")
fig = px.scatter(data, x='Date', y='Engagement',
size='Users', color='Category')
st.plotly_chart(fig, use_container_width=True)
def render_analytics():
st.header("πŸ” Data Analytics")
processor = DataProcessor()
uploaded_file = st.file_uploader("Upload your CSV data", type=['csv'])
if uploaded_file is not None:
if processor.load_data(uploaded_file):
st.success("Data loaded successfully!")
tabs = st.tabs(["Data Preview", "Statistics", "Visualization", "Metrics"])
with tabs[0]:
st.subheader("Data Preview")
st.dataframe(processor.data.head())
st.info(f"Total rows: {len(processor.data)}, Total columns: {len(processor.data.columns)}")
with tabs[1]:
st.subheader("Basic Statistics")
stats = processor.get_basic_stats()
st.write(stats['summary'])
st.subheader("Missing Values")
st.write(stats['missing_values'])
with tabs[2]:
st.subheader("Create Visualization")
col1, col2, col3 = st.columns(3)
with col1:
chart_type = st.selectbox(
"Select Chart Type",
["Line Plot", "Bar Plot", "Scatter Plot", "Box Plot", "Histogram"]
)
with col2:
x_col = st.selectbox("Select X-axis", processor.data.columns)
with col3:
y_col = st.selectbox("Select Y-axis", processor.numeric_columns) if chart_type != "Histogram" else None
color_col = st.selectbox("Select Color Variable (optional)",
['None'] + processor.categorical_columns)
color_col = None if color_col == 'None' else color_col
fig = processor.create_visualization(
chart_type,
x_col,
y_col if y_col else x_col,
color_col
)
st.plotly_chart(fig, use_container_width=True)
with tabs[3]:
st.subheader("Column Metrics")
selected_col = st.selectbox("Select column", processor.numeric_columns)
metrics = {
'Mean': processor.data[selected_col].mean(),
'Median': processor.data[selected_col].median(),
'Std Dev': processor.data[selected_col].std(),
'Min': processor.data[selected_col].min(),
'Max': processor.data[selected_col].max()
}
cols = st.columns(len(metrics))
for col, (metric, value) in zip(cols, metrics.items()):
col.metric(metric, f"{value:.2f}")
def render_brainstorm_page():
st.title("Product Brainstorm Hub")
manager = BrainstormManager()
action = st.sidebar.radio("Action", ["View Products", "Create New Product"])
if action == "Create New Product":
basic_info, market_analysis, submitted = manager.generate_product_form()
if submitted:
product_data = {**basic_info, **market_analysis}
insights = manager.analyze_product(product_data)
product_id = f"prod_{len(st.session_state.products)}"
st.session_state.products[product_id] = {
"data": product_data,
"insights": insights,
"created_at": str(datetime.now())
}
st.success("Product added! View insights in the Products tab.")
else:
if st.session_state.products:
for prod_id, product in st.session_state.products.items():
with st.expander(f"🎯 {product['data']['name']}"):
col1, col2 = st.columns(2)
with col1:
st.subheader("Product Details")
st.write(f"Category: {product['data']['category']}")
st.write(f"Target: {', '.join(product['data']['target_audience'])}")
st.write(f"Description: {product['data']['description']}")
with col2:
st.subheader("Insights")
st.metric("Opportunity Score", f"{product['insights']['market_opportunity']}/10")
st.metric("Suggested Price", f"${product['insights']['suggested_price']}")
st.write("**Risk Factors:**")
for risk in product['insights']['risk_factors']:
st.write(f"- {risk}")
st.write("**Next Steps:**")
for step in product['insights']['next_steps']:
st.write(f"- {step}")
else:
st.info("No products yet. Create one to get started!")
def render_chat():
st.header("πŸ’¬ Business Assistant")
if "messages" not in st.session_state:
st.session_state.messages = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if prompt := st.chat_input("Ask about your business..."):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
response = f"Thank you for your question about '{prompt}'. The LLM integration will be implemented soon."
with st.chat_message("assistant"):
st.markdown(response)
st.session_state.messages.append({"role": "assistant", "content": response})
def main():
st.set_page_config(
page_title="Prospira",
page_icon="πŸ“Š",
layout="wide",
initial_sidebar_state="expanded"
)
with st.sidebar:
st.title("Prospira")
st.subheader("Data-Driven Solutions")
page = st.radio(
"Navigation",
["Dashboard", "Analytics", "Brainstorm", "Chat"]
)
if page == "Dashboard":
render_dashboard()
elif page == "Analytics":
render_analytics()
elif page == "Brainstorm":
render_brainstorm_page()
elif page == "Chat":
render_chat()
if __name__ == "__main__":
main()
print("rohith")