marketplace / app.py
tanya17's picture
Update app.py
fdf9ba3 verified
import gradio as gr
import sqlite3
import bcrypt
import pandas as pd
import numpy as np
import joblib
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.neighbors import NearestNeighbors
import plotly.express as px
import time
import threading
import random
import requests
import os
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
# Load tokenizer and model for Flan-T5
model_name = "google/flan-t5-base"
tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-base")
model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-base")
# Create a pipeline
generator = pipeline("text2text-generation", model=model, tokenizer=tokenizer)
# File to store feedback
FEEDBACK_FILE = "user_feedback.csv"
def huggingface_chatbot(user_input):
try:
result = generator(user_input, max_length=150, temperature=0.7, do_sample=True)
if isinstance(result, list) and "generated_text" in result[0]:
return result[0]["generated_text"]
elif "generated_text" in result:
return result["generated_text"]
elif "text" in result[0]:
return result[0]["text"]
else:
return "⚠️ Could not parse model response."
except Exception as e:
return f"Error: {str(e)}"
# Database setup for user authentication
def init_db():
conn = sqlite3.connect("users.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE,
password TEXT
)
""")
conn.commit()
conn.close()
init_db()
def register(username, password):
conn = sqlite3.connect("users.db")
cursor = conn.cursor()
hashed_pw = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
try:
cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, hashed_pw))
conn.commit()
return "✅ Registration Successful! You can now log in."
except sqlite3.IntegrityError:
return "⚠️ Username already exists. Try another."
finally:
conn.close()
def login(username, password):
conn = sqlite3.connect("users.db")
cursor = conn.cursor()
cursor.execute("SELECT password FROM users WHERE username = ?", (username,))
result = cursor.fetchone()
conn.close()
if result and bcrypt.checkpw(password.encode(), result[0]):
return "✅ Login Successful! Welcome to the marketplace."
else:
return "❌ Incorrect username or password. Try again."
# Initialize the feedback CSV if it doesn't exist
if not os.path.exists(FEEDBACK_FILE):
pd.DataFrame(columns=[
"Satisfaction", "Useful_Features", "Issues", "Suggestions", "Recommendation", "Name", "Email"
]).to_csv(FEEDBACK_FILE, index=False)
def submit_feedback(satisfaction, features, issues, suggestions, recommendation, name, email):
feedback = pd.read_csv(FEEDBACK_FILE)
new_entry = {
"Satisfaction": satisfaction,
"Useful_Features": ", ".join(features),
"Issues": issues,
"Suggestions": suggestions,
"Recommendation": recommendation,
"Name": name,
"Email": email
}
feedback = pd.concat([feedback, pd.DataFrame([new_entry])], ignore_index=True)
feedback.to_csv(FEEDBACK_FILE, index=False)
return "✅ Thank you for your feedback!"
# Load datasets (using sample data if files don't exist)
try:
df_lifecycle = pd.read_csv("ecommerce_product_dataset.csv")
except:
# Sample data if file not found
df_lifecycle = pd.DataFrame({
"Category": ["Electronics", "Plastic", "Metal", "Wood", "Composite"] * 20,
"ProductName": ["Product " + str(i) for i in range(100)],
"Price": np.random.uniform(10, 500, 100),
"Rating": np.random.uniform(1, 5, 100),
"NumReviews": np.random.randint(0, 1000, 100),
"StockQuantity": np.random.randint(0, 500, 100),
"Discount": np.random.uniform(0, 50, 100),
"Sales": np.random.uniform(1, 20, 100)
})
try:
df_pricing = pd.read_csv("dynamic_pricing_data_5000.csv")
except:
df_pricing = pd.DataFrame({
"Product Name": ["iPhone 13", "Nike Shoes", "Samsung TV", "Adidas Jacket"] * 25,
"Category": ["Electronics", "Fashion", "Electronics", "Fashion"] * 25,
"Base Price": np.random.uniform(100, 1000, 100),
"Competitor Price": np.random.uniform(80, 950, 100),
"Demand": np.random.choice(["Low", "Medium", "High"], 100),
"Stock": np.random.randint(0, 500, 100),
"Reviews": np.random.randint(0, 5000, 100),
"Rating": np.random.uniform(1, 5, 100),
"Season": np.random.choice(["Holiday", "Summer", "Winter", "Off-season"], 100),
"Discount": np.random.uniform(0, 30, 100),
"Final Price": np.random.uniform(50, 1200, 100)
})
try:
df_recommendation = pd.read_csv("synthetic_product_data_5000.csv")
except:
df_recommendation = pd.DataFrame({
"product_id": range(100),
"product_condition": np.random.choice(["New", "Used - Like New", "Used - Good", "Used - Fair"], 100),
"price": np.random.uniform(10, 500, 100),
"category": np.random.choice(["Electronics", "Furniture", "Clothing", "Books", "Toys"], 100)
})
# Preprocessing for product lifecycle prediction
categorical_features_lifecycle = ['Category']
numeric_features_lifecycle = ['Price', 'Rating', 'NumReviews', 'StockQuantity', 'Discount']
preprocessor_lifecycle = ColumnTransformer([
('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features_lifecycle),
('num', 'passthrough', numeric_features_lifecycle)
])
# Fit the preprocessor on training data
X_lifecycle = df_lifecycle[["Category", "ProductName", "Price", "Rating", "NumReviews", "StockQuantity", "Discount"]]
y_lifecycle = df_lifecycle["Sales"] # Target variable
X_transformed_lifecycle = preprocessor_lifecycle.fit_transform(X_lifecycle)
# Train the model
model_lifecycle = RandomForestRegressor(n_estimators=100, random_state=42)
model_lifecycle.fit(X_transformed_lifecycle, y_lifecycle)
def preprocess_input_lifecycle(Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount):
input_df = pd.DataFrame([[Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount]],
columns=["Category", "ProductName", "Price", "Rating", "NumReviews", "StockQuantity", "Discount"])
input_processed = preprocessor_lifecycle.transform(input_df)
return input_processed
def predict_lifecycle(Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount):
try:
input_data = preprocess_input_lifecycle(Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount)
prediction = model_lifecycle.predict(input_data)[0]
return f"Predicted Product Lifecycle: {round(prediction, 2)} years"
except Exception as e:
return f"Error: {str(e)}"
# Encode categorical variables for dynamic pricing
label_encoders = {}
for col in ["Product Name", "Category", "Demand", "Season"]:
le = LabelEncoder()
df_pricing[col] = le.fit_transform(df_pricing[col])
label_encoders[col] = le
# Scale numerical features for dynamic pricing
scaler = StandardScaler()
num_cols = ["Base Price", "Competitor Price", "Stock", "Reviews", "Rating", "Discount"]
df_pricing[num_cols] = scaler.fit_transform(df_pricing[num_cols])
# Train model for dynamic pricing
X_pricing = df_pricing.drop(columns=["Final Price"])
y_pricing = df_pricing["Final Price"]
model_pricing = RandomForestRegressor(n_estimators=100, random_state=42)
model_pricing.fit(X_pricing, y_pricing)
def predict_price(product_name, category, base_price, competitor_price, demand, stock, reviews, rating, season, discount):
try:
# Encode categorical features
category = label_encoders["Category"].transform([category])[0]
demand = label_encoders["Demand"].transform([demand])[0]
season = label_encoders["Season"].transform([season])[0]
product_name = label_encoders["Product Name"].transform([product_name])[0]
# Scale numerical features
features = np.array([base_price, competitor_price, stock, reviews, rating, discount]).reshape(1, -1)
features = scaler.transform(features)
# Combine features
final_features = np.concatenate((features.flatten(), [category, demand, season, product_name])).reshape(1, -1)
# Predict
predicted_price = model_pricing.predict(final_features)[0]
return f"Optimal Price: ₹{round(predicted_price, 2)}"
except Exception as e:
return f"Error: {str(e)}"
# Preprocessing for product recommendation
categorical_features_recommendation = ['product_condition', 'category']
numeric_features_recommendation = ['price']
preprocessor_recommendation = ColumnTransformer(
transformers=[
('cat', OneHotEncoder(), categorical_features_recommendation),
('num', 'passthrough', numeric_features_recommendation)
])
product_features = preprocessor_recommendation.fit_transform(df_recommendation[['product_condition', 'price', 'category']])
# Fit NearestNeighbors model
knn = NearestNeighbors(n_neighbors=5)
knn.fit(product_features)
def recommend_products(category):
try:
filtered_df = df_recommendation[df_recommendation['category'] == category]
if filtered_df.empty:
return "No products found in this category."
random_product = random.choice(filtered_df.index)
product = product_features[random_product].reshape(1, -1)
_, indices = knn.kneighbors(product)
recommended = df_recommendation.iloc[indices[0]]
recommended = recommended[recommended['category'] == category]
return recommended[['product_id', 'product_condition', 'price', 'category']]
except Exception as e:
return f"Error: {str(e)}"
# Circular Economy Analytics Dashboard
def load_data():
try:
return pd.read_csv("synthetic_marketplace_data_2000.csv")
except:
return pd.DataFrame({
"Category": np.random.choice(["Electronics", "Plastic", "Metal", "Wood", "Composite"], 100),
"LifecycleYears": np.random.uniform(1, 20, 100),
"Price": np.random.uniform(10, 500, 100),
"NumReviews": np.random.randint(0, 1000, 100)
})
def update_live_data():
df = load_data()
new_entry = {
"Category": np.random.choice(["Electronics", "Plastic", "Metal", "Wood", "Composite"]),
"LifecycleYears": round(np.random.uniform(1, 20), 2),
"Price": round(np.random.uniform(10, 500), 2),
"NumReviews": np.random.randint(0, 1000)
}
df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True)
df.to_csv("synthetic_marketplace_data_2000.csv", index=False)
def generate_dashboard():
df = load_data()
lifecycle_fig = px.bar(df.groupby('Category')['LifecycleYears'].mean().reset_index(),
x='Category', y='LifecycleYears', title='Average Product Lifecycle by Category')
price_trend_fig = px.line(df.groupby('Category')['Price'].mean().reset_index(),
x='Category', y='Price', title='Average Price Trends by Category')
engagement_fig = px.bar(df.groupby('Category')['NumReviews'].sum().reset_index(),
x='Category', y='NumReviews', title='Total User Reviews per Category')
df['Sustainability Score'] = np.random.uniform(0, 100, len(df))
sustainability_fig = px.scatter(df, x='Price', y='Sustainability Score', color='Category',
title='Sustainability Score vs. Product Price')
return lifecycle_fig, price_trend_fig, engagement_fig, sustainability_fig
# Gradio Application
with gr.Blocks(title="Circular Economy Marketplace") as app:
# Add a logo or banner image
gr.Markdown("""
<div style="text-align: center;">
<h1>♻️ Circular Economy Marketplace</h1>
<p>Sustainable product lifecycle management and recommendations</p>
</div>
""")
# Login/Register Tab
with gr.Tab("🔐 Login/Register"):
with gr.Tab("Register"):
reg_username = gr.Textbox(label="Username")
reg_password = gr.Textbox(label="Password", type="password")
reg_btn = gr.Button("Register")
reg_output = gr.Textbox()
reg_btn.click(register, inputs=[reg_username, reg_password], outputs=reg_output)
with gr.Tab("Login"):
log_username = gr.Textbox(label="Username")
log_password = gr.Textbox(label="Password", type="password")
log_btn = gr.Button("Login")
log_output = gr.Textbox()
log_btn.click(login, inputs=[log_username, log_password], outputs=log_output)
# Product Lifecycle Prediction Tab
with gr.Tab("📈 Product Lifecycle"):
gr.Markdown("### Predict the lifecycle of your product")
with gr.Row():
with gr.Column():
lifecycle_inputs = [
gr.Dropdown(["Plastic", "Metal", "Wood", "Composite", "Electronics"], label="Category"),
gr.Textbox(label="Product Name"),
gr.Number(label="Price"),
gr.Slider(1, 5, label="Rating", step=0.1),
gr.Number(label="Number of Reviews"),
gr.Number(label="Stock Quantity"),
gr.Slider(0, 50, label="Discount (%)", step=1)
]
with gr.Column():
lifecycle_output = gr.Textbox(label="Prediction", interactive=False)
lifecycle_btn = gr.Button("Predict Lifecycle", variant="primary")
lifecycle_btn.click(predict_lifecycle, inputs=lifecycle_inputs, outputs=lifecycle_output)
# Dynamic Pricing Tab
with gr.Tab("💰 Dynamic Pricing"):
gr.Markdown("### Get optimal pricing recommendations")
with gr.Row():
with gr.Column():
pricing_inputs = [
gr.Dropdown(["iPhone 13", "Nike Shoes", "Samsung TV", "Adidas Jacket", "Dell Laptop",
"Sony Headphones", "Apple Watch", "LG Refrigerator", "HP Printer", "Bose Speaker"],
label="Product Name"),
gr.Dropdown(["Electronics", "Fashion", "Home Appliances"], label="Category"),
gr.Number(label="Base Price"),
gr.Number(label="Competitor Price"),
gr.Dropdown(["Low", "Medium", "High"], label="Demand Level"),
gr.Number(label="Stock Available"),
gr.Number(label="Number of Reviews"),
gr.Slider(1, 5, label="Rating", step=0.1),
gr.Dropdown(["Holiday", "Summer", "Winter", "Off-season"], label="Season"),
gr.Slider(0, 30, label="Discount (%)", step=1)
]
with gr.Column():
pricing_output = gr.Textbox(label="Optimal Price", interactive=False)
pricing_btn = gr.Button("Calculate Optimal Price", variant="primary")
pricing_btn.click(predict_price, inputs=pricing_inputs, outputs=pricing_output)
# Product Recommendation Tab
with gr.Tab("🛍️ Product Recommendations"):
gr.Markdown("### Discover similar products in our marketplace")
with gr.Row():
with gr.Column():
recommendation_input = gr.Dropdown(
choices=df_recommendation['category'].unique().tolist(),
label="Select Product Category"
)
recommendation_btn = gr.Button("Find Recommendations", variant="primary")
with gr.Column():
recommendation_output = gr.Dataframe(
headers=["ID", "Condition", "Price", "Category"],
datatype=["str", "str", "number", "str"],
interactive=False
)
recommendation_btn.click(recommend_products, inputs=recommendation_input, outputs=recommendation_output)
# Circular Economy Analytics Tab
with gr.Tab("📊 Marketplace Analytics"):
gr.Markdown("### Real-time marketplace insights and trends")
with gr.Row():
dashboard_btn = gr.Button("Refresh Dashboard", variant="primary")
with gr.Row():
dashboard_outputs = [
gr.Plot(label="Product Lifecycle Analytics"),
gr.Plot(label="Dynamic Pricing Insights")
]
with gr.Row():
dashboard_outputs.extend([
gr.Plot(label="User Engagement Trends"),
gr.Plot(label="Sustainability Insights")
])
dashboard_btn.click(generate_dashboard, inputs=[], outputs=dashboard_outputs)
# AI Chatbot Tab
with gr.Tab("💬 AI Assistant"):
gr.Markdown("### Ask our AI assistant anything about circular economy")
with gr.Row():
with gr.Column(scale=3):
chatbot_input = gr.Textbox(
label="Your question",
placeholder="Ask about product lifecycle, pricing, recommendations..."
)
chatbot_btn = gr.Button("Ask", variant="primary")
with gr.Column(scale=4):
chatbot_output = gr.Textbox(label="AI Response", interactive=False)
chatbot_btn.click(huggingface_chatbot, inputs=chatbot_input, outputs=chatbot_output)
# Feedback Tab
with gr.Tab("📝 Feedback"):
gr.Markdown("## We value your feedback!")
gr.Markdown("Help us improve by sharing your experience with our platform.")
with gr.Row():
with gr.Column():
satisfaction = gr.Radio(
["Very Satisfied", "Satisfied", "Neutral", "Dissatisfied", "Very Dissatisfied"],
label="1. Overall Satisfaction"
)
features = gr.CheckboxGroup(
["Lifecycle Prediction", "Dynamic Pricing", "Product Recommendation",
"Marketplace Analytics", "AI Assistant", "Other"],
label="2. Most Useful Features"
)
issues = gr.Textbox(
label="3. Issues Faced (if any)",
placeholder="Describe any problems you encountered..."
)
with gr.Column():
suggestions = gr.Textbox(
label="4. Suggestions for Improvement",
placeholder="How can we make this platform better?"
)
recommendation = gr.Radio(
["Yes", "No", "Maybe"],
label="5. Would you recommend this platform to others?"
)
name = gr.Textbox(label="6. Your Name (Optional)")
email = gr.Textbox(label="7. Email (Optional)")
submit_btn = gr.Button("Submit Feedback", variant="primary")
feedback_output = gr.Textbox(label="Submission Status", interactive=False)
submit_btn.click(
fn=submit_feedback,
inputs=[satisfaction, features, issues, suggestions, recommendation, name, email],
outputs=feedback_output
)
# Simulate real-time data updates
def live_update():
while True:
update_live_data()
time.sleep(5)
threading.Thread(target=live_update, daemon=True).start()
# Launch the app
if __name__ == "__main__":
app.launch(share=False)