Spaces:
Running
Running
import streamlit as st | |
import plotly.express as px | |
import plotly.graph_objects as go | |
import pandas as pd | |
import numpy as np | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Any | |
import streamlit as st | |
import streamlit.components.v1 as components | |
# --- Data Processing Class --- | |
class DataProcessor: | |
def __init__(self): | |
self.data = None | |
self.numeric_columns = [] | |
self.categorical_columns = [] | |
self.date_columns = [] | |
def load_data(self, file) -> bool: | |
try: | |
self.data = pd.read_csv(file) | |
self._classify_columns() | |
return True | |
except Exception as e: | |
st.error(f"Error loading data: {str(e)}") | |
return False | |
def _classify_columns(self): | |
for col in self.data.columns: | |
if pd.api.types.is_numeric_dtype(self.data[col]): | |
self.numeric_columns.append(col) | |
elif pd.api.types.is_datetime64_any_dtype(self.data[col]): | |
self.date_columns.append(col) | |
else: | |
try: | |
pd.to_datetime(self.data[col]) | |
self.date_columns.append(col) | |
except: | |
self.categorical_columns.append(col) | |
def get_basic_stats(self) -> Dict[str, Any]: | |
if self.data is None: | |
return {} | |
stats = { | |
'summary': self.data[self.numeric_columns].describe(), | |
'missing_values': self.data.isnull().sum(), | |
'row_count': len(self.data), | |
'column_count': len(self.data.columns) | |
} | |
return stats | |
def create_visualization(self, chart_type: str, x_col: str, y_col: str, color_col: str = None) -> go.Figure: | |
if chart_type == "Line Plot": | |
fig = px.line(self.data, x=x_col, y=y_col, color=color_col) | |
elif chart_type == "Bar Plot": | |
fig = px.bar(self.data, x=x_col, y=y_col, color=color_col) | |
elif chart_type == "Scatter Plot": | |
fig = px.scatter(self.data, x=x_col, y=y_col, color=color_col) | |
elif chart_type == "Box Plot": | |
fig = px.box(self.data, x=x_col, y=y_col, color=color_col) | |
else: | |
fig = px.histogram(self.data, x=x_col, color=color_col) | |
return fig | |
class BrainstormManager: | |
def __init__(self): | |
if 'products' not in st.session_state: | |
st.session_state.products = {} | |
def generate_product_form(self) -> Dict: | |
with st.form("product_form"): | |
basic_info = { | |
"name": st.text_input("Product Name"), | |
"category": st.selectbox("Category", ["Digital", "Physical", "Service"]), | |
"description": st.text_area("Description"), | |
"target_audience": st.multiselect("Target Audience", | |
["Students", "Professionals", "Businesses", "Seniors", "Youth"]), | |
"price_range": st.slider("Price Range ($)", 0, 1000, (50, 200)), | |
"launch_date": st.date_input("Expected Launch Date") | |
} | |
st.subheader("Market Analysis") | |
market_analysis = { | |
"competitors": st.text_area("Main Competitors (one per line)"), | |
"unique_features": st.text_area("Unique Selling Points"), | |
"market_size": st.selectbox("Market Size", | |
["Small", "Medium", "Large", "Enterprise"]), | |
"growth_potential": st.slider("Growth Potential", 1, 10) | |
} | |
submitted = st.form_submit_button("Save Product") | |
return basic_info, market_analysis, submitted | |
def analyze_product(self, product_data: Dict) -> Dict: | |
insights = { | |
"market_opportunity": self._calculate_opportunity_score(product_data), | |
"suggested_price": self._suggest_price(product_data), | |
"risk_factors": self._identify_risks(product_data), | |
"next_steps": self._generate_next_steps(product_data) | |
} | |
return insights | |
def _calculate_opportunity_score(self, data: Dict) -> int: | |
score = 0 | |
if data.get("market_size") == "Large": | |
score += 3 | |
if len(data.get("target_audience", [])) >= 2: | |
score += 2 | |
if data.get("growth_potential", 0) > 7: | |
score += 2 | |
return min(score, 10) | |
def _suggest_price(self, data: Dict) -> float: | |
base_price = sum(data.get("price_range", (0, 0))) / 2 | |
if data.get("market_size") == "Enterprise": | |
base_price *= 1.5 | |
return round(base_price, 2) | |
def _identify_risks(self, data: Dict) -> List[str]: | |
risks = [] | |
if data.get("competitors"): | |
risks.append("Competitive market - differentiation crucial") | |
if len(data.get("target_audience", [])) < 2: | |
risks.append("Narrow target audience - consider expansion") | |
return risks | |
def _generate_next_steps(self, data: Dict) -> List[str]: | |
steps = [ | |
"Create detailed product specification", | |
"Develop MVP timeline", | |
"Plan marketing strategy" | |
] | |
if data.get("market_size") == "Enterprise": | |
steps.append("Prepare enterprise sales strategy") | |
return steps | |
# --- Sample Data Generation --- | |
def generate_sample_data(): | |
dates = pd.date_range(start='2024-01-01', end='2024-01-31', freq='D') | |
return pd.DataFrame({ | |
'Date': dates, | |
'Revenue': np.random.normal(1000, 100, len(dates)), | |
'Users': np.random.randint(100, 200, len(dates)), | |
'Engagement': np.random.uniform(0.5, 0.9, len(dates)), | |
'Category': np.random.choice(['A', 'B', 'C'], len(dates)) | |
}) | |
# --- Page Rendering Functions --- | |
def render_dashboard(): | |
st.header("π Comprehensive Business Performance Dashboard") | |
# Generate sample data with more complex structure | |
data = generate_sample_data() | |
data['Profit_Margin'] = data['Revenue'] * np.random.uniform(0.1, 0.3, len(data)) | |
# Top-level KPI Section | |
col1, col2, col3, col4 = st.columns(4) | |
with col1: | |
st.metric("Total Revenue", | |
f"${data['Revenue'].sum():,.2f}", | |
delta=f"{data['Revenue'].pct_change().mean()*100:.2f}%") | |
with col2: | |
st.metric("Total Users", | |
f"{data['Users'].sum():,}", | |
delta=f"{data['Users'].pct_change().mean()*100:.2f}%") | |
with col3: | |
st.metric("Avg Engagement", | |
f"{data['Engagement'].mean():.2%}", | |
delta=f"{data['Engagement'].pct_change().mean()*100:.2f}%") | |
with col4: | |
st.metric("Profit Margin", | |
f"{data['Profit_Margin'].mean():.2%}", | |
delta=f"{data['Profit_Margin'].pct_change().mean()*100:.2f}%") | |
# Visualization Grid | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("Revenue & Profit Trends") | |
fig_revenue = go.Figure() | |
fig_revenue.add_trace(go.Scatter( | |
x=data['Date'], | |
y=data['Revenue'], | |
mode='lines', | |
name='Revenue', | |
line=dict(color='blue') | |
)) | |
fig_revenue.add_trace(go.Scatter( | |
x=data['Date'], | |
y=data['Profit_Margin'], | |
mode='lines', | |
name='Profit Margin', | |
line=dict(color='green') | |
)) | |
fig_revenue.update_layout(height=350) | |
st.plotly_chart(fig_revenue, use_container_width=True) | |
with col2: | |
st.subheader("User Engagement Analysis") | |
fig_engagement = px.scatter( | |
data, | |
x='Users', | |
y='Engagement', | |
color='Category', | |
size='Revenue', | |
hover_data=['Date'], | |
title='User Engagement Dynamics' | |
) | |
fig_engagement.update_layout(height=350) | |
st.plotly_chart(fig_engagement, use_container_width=True) | |
# Category Performance | |
st.subheader("Category Performance Breakdown") | |
category_performance = data.groupby('Category').agg({ | |
'Revenue': 'sum', | |
'Users': 'sum', | |
'Engagement': 'mean' | |
}).reset_index() | |
fig_category = px.bar( | |
category_performance, | |
x='Category', | |
y='Revenue', | |
color='Engagement', | |
title='Revenue by Category with Engagement Overlay' | |
) | |
st.plotly_chart(fig_category, use_container_width=True) | |
# Bottom Summary | |
st.subheader("Quick Insights") | |
insights_col1, insights_col2 = st.columns(2) | |
with insights_col1: | |
st.metric("Top Performing Category", | |
category_performance.loc[category_performance['Revenue'].idxmax(), 'Category']) | |
with insights_col2: | |
st.metric("Highest Engagement Category", | |
category_performance.loc[category_performance['Engagement'].idxmax(), 'Category']) | |
def render_analytics(): | |
st.header("π Data Analytics") | |
processor = DataProcessor() | |
uploaded_file = st.file_uploader("Upload your CSV data", type=['csv']) | |
if uploaded_file is not None: | |
if processor.load_data(uploaded_file): | |
st.success("Data loaded successfully!") | |
tabs = st.tabs(["Data Preview", "Statistics", "Visualization", "Metrics"]) | |
with tabs[0]: | |
st.subheader("Data Preview") | |
st.dataframe(processor.data.head()) | |
st.info(f"Total rows: {len(processor.data)}, Total columns: {len(processor.data.columns)}") | |
with tabs[1]: | |
st.subheader("Basic Statistics") | |
stats = processor.get_basic_stats() | |
st.write(stats['summary']) | |
st.subheader("Missing Values") | |
st.write(stats['missing_values']) | |
with tabs[2]: | |
st.subheader("Create Visualization") | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
chart_type = st.selectbox( | |
"Select Chart Type", | |
["Line Plot", "Bar Plot", "Scatter Plot", "Box Plot", "Histogram"] | |
) | |
with col2: | |
x_col = st.selectbox("Select X-axis", processor.data.columns) | |
with col3: | |
y_col = st.selectbox("Select Y-axis", processor.numeric_columns) if chart_type != "Histogram" else None | |
color_col = st.selectbox("Select Color Variable (optional)", | |
['None'] + processor.categorical_columns) | |
color_col = None if color_col == 'None' else color_col | |
fig = processor.create_visualization( | |
chart_type, | |
x_col, | |
y_col if y_col else x_col, | |
color_col | |
) | |
st.plotly_chart(fig, use_container_width=True) | |
with tabs[3]: | |
st.subheader("Column Metrics") | |
selected_col = st.selectbox("Select column", processor.numeric_columns) | |
metrics = { | |
'Mean': processor.data[selected_col].mean(), | |
'Median': processor.data[selected_col].median(), | |
'Std Dev': processor.data[selected_col].std(), | |
'Min': processor.data[selected_col].min(), | |
'Max': processor.data[selected_col].max() | |
} | |
cols = st.columns(len(metrics)) | |
for col, (metric, value) in zip(cols, metrics.items()): | |
col.metric(metric, f"{value:.2f}") | |
def render_brainstorm_page(): | |
st.title("Product Brainstorm Hub") | |
manager = BrainstormManager() | |
action = st.sidebar.radio("Action", ["View Products", "Create New Product"]) | |
if action == "Create New Product": | |
basic_info, market_analysis, submitted = manager.generate_product_form() | |
if submitted: | |
product_data = {**basic_info, **market_analysis} | |
insights = manager.analyze_product(product_data) | |
product_id = f"prod_{len(st.session_state.products)}" | |
st.session_state.products[product_id] = { | |
"data": product_data, | |
"insights": insights, | |
"created_at": str(datetime.now()) | |
} | |
st.success("Product added! View insights in the Products tab.") | |
else: | |
if st.session_state.products: | |
for prod_id, product in st.session_state.products.items(): | |
with st.expander(f"π― {product['data']['name']}"): | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("Product Details") | |
st.write(f"Category: {product['data']['category']}") | |
st.write(f"Target: {', '.join(product['data']['target_audience'])}") | |
st.write(f"Description: {product['data']['description']}") | |
with col2: | |
st.subheader("Insights") | |
st.metric("Opportunity Score", f"{product['insights']['market_opportunity']}/10") | |
st.metric("Suggested Price", f"${product['insights']['suggested_price']}") | |
st.write("**Risk Factors:**") | |
for risk in product['insights']['risk_factors']: | |
st.write(f"- {risk}") | |
st.write("**Next Steps:**") | |
for step in product['insights']['next_steps']: | |
st.write(f"- {step}") | |
else: | |
st.info("No products yet. Create one to get started!") | |
def generate_response(self, prompt: str, context: list = None) -> str: | |
if not self.model or not self.tokenizer: | |
return "LLM not initialized. Please check model configuration." | |
# Prepare conversation context | |
if context is None: | |
context = [] | |
# Create full prompt with conversation history | |
full_prompt = "".join([f"{msg['role']}: {msg['content']}\n" for msg in context]) | |
full_prompt += f"user: {prompt}\nassistant: " | |
# Tokenize input | |
input_ids = self.tokenizer(full_prompt, return_tensors="pt").input_ids.to(self.model.device) | |
# Generate response | |
try: | |
output = self.model.generate( | |
input_ids, | |
max_length=500, | |
num_return_sequences=1, | |
no_repeat_ngram_size=2, | |
temperature=0.7, | |
top_p=0.9 | |
) | |
# Decode response | |
response = self.tokenizer.decode(output[0], skip_special_tokens=True) | |
# Extract only the new part of the response | |
response = response[len(full_prompt):].strip() | |
return response | |
except Exception as e: | |
return f"Response generation error: {e}" | |
def render_chat(): | |
st.header("π¬AI Business Mentor") | |
st.title("π€ Prospira AI Business Mentor") | |
iframe_code = """ | |
<iframe | |
src="https://demoorganisation34-chatbot-for-prospira.hf.space" | |
width="100%" height="500" frameborder="0"> | |
</iframe> | |
""" | |
components.html(iframe_code, height=600) | |
def render_home(): | |
st.title("π Welcome to Prospira") | |
st.subheader("π Data-Driven Solutions for Businesses and Creators") | |
st.markdown(""" | |
**Prospira** empowers businesses and creators to enhance their content, products, and marketing strategies using AI-driven insights. | |
### **β¨ Key Features** | |
- **π Performance Analytics:** Real-time insights into business metrics. | |
- **π Competitive Analysis:** Benchmark your business against competitors. | |
- **π‘ Smart Product Ideas:** AI-generated recommendations for future products and content. | |
- **π§ AI Business Mentor:** Personalized AI guidance for strategy and growth. | |
Explore how **Prospira** can help optimize your decision-making and drive success! π‘π | |
""") | |
def main(): | |
st.set_page_config( | |
page_title="Prospira", | |
page_icon="π", | |
layout="centered", | |
initial_sidebar_state="expanded" | |
) | |
# Create a selection box to choose between pages | |
page = st.sidebar.radio("Select a page", ["Home", "Dashboard", "Analytics", "Brainstorm", "Chat"]) | |
if page == "Home": | |
render_home() | |
elif page == "Dashboard": | |
render_dashboard() | |
elif page == "Analytics": | |
render_analytics() | |
elif page == "Brainstorm": | |
render_brainstorm_page() | |
elif page == "Chat": | |
render_chat() | |
if __name__ == "__main__": | |
main() |