Spaces:
Running
Running
import gradio as gr | |
import sqlite3 | |
import bcrypt | |
import pandas as pd | |
import numpy as np | |
import joblib | |
from sklearn.ensemble import RandomForestRegressor | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler | |
from sklearn.compose import ColumnTransformer | |
from sklearn.neighbors import NearestNeighbors | |
import plotly.express as px | |
import time | |
import threading | |
import random | |
import requests | |
import os # Import the os module to access environment variables | |
# Hugging Face API configuration | |
HUGGINGFACE_API_URL = "https://api-inference.huggingface.co/models/google/flan-t5-large" | |
HUGGINGFACE_API_KEY = os.environ["HUGGINGFACE_API_KEY"] # Access the API key from environment variables | |
def huggingface_chatbot(user_input): | |
try: | |
headers = { | |
"Authorization": f"Bearer {HUGGINGFACE_API_KEY}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"inputs": f"Answer the following question clearly and concisely:\n{user_input}", | |
"parameters": { | |
"max_length": 150, | |
"temperature": 0.3 # Less randomness | |
} | |
} | |
response = requests.post(HUGGINGFACE_API_URL, headers=headers, json=data) | |
response.raise_for_status() | |
return response.json()[0]["generated_text"] | |
except Exception as e: | |
return f"Error: {str(e)}" | |
def huggingface_chatbot(user_input): | |
try: | |
headers = { | |
"Authorization": f"Bearer {HUGGINGFACE_API_KEY}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"inputs": f"Answer the following question clearly and concisely:\n{user_input}", | |
"parameters": { | |
"max_length": 150, | |
"temperature": 0.3 # Less randomness | |
} | |
} | |
response = requests.post(HUGGINGFACE_API_URL, headers=headers, json=data) | |
response.raise_for_status() | |
return response.json()[0]["generated_text"] | |
except Exception as e: | |
return f"Error: {str(e)}" | |
# Hugging Face Chatbot Function | |
def huggingface_chatbot(user_input): | |
try: | |
headers = { | |
"Authorization": f"Bearer {HUGGINGFACE_API_KEY}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"inputs": user_input, | |
"parameters": { | |
"max_length": 100, # Adjust as needed | |
"temperature": 0.7 # Adjust as needed | |
} | |
} | |
response = requests.post(HUGGINGFACE_API_URL, headers=headers, json=data) | |
response.raise_for_status() # Raise an error for bad status codes | |
return response.json()[0]["generated_text"] | |
except Exception as e: | |
return f"Error: {str(e)}" | |
# Database setup for user authentication | |
def init_db(): | |
conn = sqlite3.connect("users.db") | |
cursor = conn.cursor() | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
username TEXT UNIQUE, | |
password TEXT | |
) | |
""") | |
conn.commit() | |
conn.close() | |
init_db() | |
def register(username, password): | |
conn = sqlite3.connect("users.db") | |
cursor = conn.cursor() | |
hashed_pw = bcrypt.hashpw(password.encode(), bcrypt.gensalt()) | |
try: | |
cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, hashed_pw)) | |
conn.commit() | |
return "✅ Registration Successful! You can now log in." | |
except sqlite3.IntegrityError: | |
return "⚠️ Username already exists. Try another." | |
finally: | |
conn.close() | |
def login(username, password): | |
conn = sqlite3.connect("users.db") | |
cursor = conn.cursor() | |
cursor.execute("SELECT password FROM users WHERE username = ?", (username,)) | |
result = cursor.fetchone() | |
conn.close() | |
if result and bcrypt.checkpw(password.encode(), result[0]): | |
return "✅ Login Successful! Welcome to the marketplace." | |
else: | |
return "❌ Incorrect username or password. Try again." | |
# Load dataset for product lifecycle prediction | |
df_lifecycle = pd.read_csv("ecommerce_product_dataset.csv") # Update this path with the correct one | |
# Preprocessing for product lifecycle prediction | |
categorical_features_lifecycle = ['Category'] | |
numeric_features_lifecycle = ['Price', 'Rating', 'NumReviews', 'StockQuantity', 'Discount'] | |
preprocessor_lifecycle = ColumnTransformer([ | |
('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features_lifecycle), | |
('num', 'passthrough', numeric_features_lifecycle) | |
]) | |
# Fit the preprocessor on training data | |
X_lifecycle = df_lifecycle[["Category", "ProductName", "Price", "Rating", "NumReviews", "StockQuantity", "Discount"]] | |
y_lifecycle = df_lifecycle["Sales"] # Target variable | |
X_transformed_lifecycle = preprocessor_lifecycle.fit_transform(X_lifecycle) | |
# Train the model | |
model_lifecycle = RandomForestRegressor(n_estimators=100, random_state=42) | |
model_lifecycle.fit(X_transformed_lifecycle, y_lifecycle) | |
# Save the fitted preprocessor & model | |
joblib.dump(preprocessor_lifecycle, "preprocessor_lifecycle.pkl") | |
joblib.dump(model_lifecycle, "product_lifecycle_model.pkl") | |
# Load trained model and fitted preprocessor | |
model_lifecycle = joblib.load("product_lifecycle_model.pkl") | |
preprocessor_lifecycle = joblib.load("preprocessor_lifecycle.pkl") | |
def preprocess_input_lifecycle(Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount): | |
input_df = pd.DataFrame([[Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount]], | |
columns=["Category", "ProductName", "Price", "Rating", "NumReviews", "StockQuantity", "Discount"]) | |
input_processed = preprocessor_lifecycle.transform(input_df) | |
return input_processed | |
def predict_lifecycle(Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount): | |
try: | |
input_data = preprocess_input_lifecycle(Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount) | |
prediction = model_lifecycle.predict(input_data)[0] | |
return f"Predicted Product Lifecycle: {round(prediction, 2)} years" | |
except Exception as e: | |
return f"Error: {str(e)}" | |
# Load dataset for dynamic pricing | |
df_pricing = pd.read_csv("dynamic_pricing_data_5000.csv") # Update this path with the correct one | |
# Encode categorical variables for dynamic pricing | |
label_encoders = {} | |
for col in ["Product Name", "Category", "Demand", "Season"]: | |
le = LabelEncoder() | |
df_pricing[col] = le.fit_transform(df_pricing[col]) | |
label_encoders[col] = le | |
# Scale numerical features for dynamic pricing | |
scaler = StandardScaler() | |
num_cols = ["Base Price", "Competitor Price", "Stock", "Reviews", "Rating", "Discount"] | |
df_pricing[num_cols] = scaler.fit_transform(df_pricing[num_cols]) | |
# Save label encoders and scaler | |
joblib.dump(label_encoders, "label_encoders.pkl") | |
joblib.dump(scaler, "scaler.pkl") | |
# Train model for dynamic pricing | |
X_pricing = df_pricing.drop(columns=["Final Price"]) | |
y_pricing = df_pricing["Final Price"] | |
model_pricing = RandomForestRegressor(n_estimators=100, random_state=42) | |
model_pricing.fit(X_pricing, y_pricing) | |
# Save the trained model | |
joblib.dump(model_pricing, "dynamic_pricing_model.pkl") | |
# Load trained model, scaler, and label encoders | |
model_pricing = joblib.load("dynamic_pricing_model.pkl") | |
scaler = joblib.load("scaler.pkl") | |
label_encoders = joblib.load("label_encoders.pkl") | |
def predict_price(product_name, category, base_price, competitor_price, demand, stock, reviews, rating, season, discount): | |
# Encode categorical features | |
category = label_encoders["Category"].transform([category])[0] | |
demand = label_encoders["Demand"].transform([demand])[0] | |
season = label_encoders["Season"].transform([season])[0] | |
product_name = label_encoders["Product Name"].transform([product_name])[0] | |
# Scale numerical features | |
features = np.array([base_price, competitor_price, stock, reviews, rating, discount]).reshape(1, -1) | |
features = scaler.transform(features) | |
# Combine features | |
final_features = np.concatenate((features.flatten(), [category, demand, season, product_name])).reshape(1, -1) | |
# Predict | |
predicted_price = model_pricing.predict(final_features)[0] | |
return f"Optimal Price: ₹{round(predicted_price, 2)}" | |
# Load dataset for product recommendation | |
df_recommendation = pd.read_csv("synthetic_product_data_5000.csv") # Update this path with the correct one | |
# Preprocessing for product recommendation | |
categorical_features_recommendation = ['product_condition', 'category'] | |
numeric_features_recommendation = ['price'] | |
preprocessor_recommendation = ColumnTransformer( | |
transformers=[ | |
('cat', OneHotEncoder(), categorical_features_recommendation), | |
('num', 'passthrough', numeric_features_recommendation) | |
]) | |
product_features = preprocessor_recommendation.fit_transform(df_recommendation[['product_condition', 'price', 'category']]) | |
# Fit NearestNeighbors model | |
knn = NearestNeighbors(n_neighbors=5) | |
knn.fit(product_features) | |
def recommend_products(category): | |
filtered_df = df_recommendation[df_recommendation['category'] == category] | |
if filtered_df.empty: | |
return "No products found in this category." | |
random_product = random.choice(filtered_df.index) | |
product = product_features[random_product].reshape(1, -1) | |
_, indices = knn.kneighbors(product) | |
recommended = df_recommendation.iloc[indices[0]] | |
recommended = recommended[recommended['category'] == category] | |
return recommended[['product_id', 'product_condition', 'price', 'category']] | |
# Circular Economy Analytics Dashboard | |
def load_data(): | |
return pd.read_csv("synthetic_marketplace_data_2000.csv") | |
def update_live_data(): | |
df = load_data() | |
new_entry = { | |
"Category": np.random.choice(["Electronics", "Plastic", "Metal", "Wood", "Composite"]), | |
"LifecycleYears": round(np.random.uniform(1, 20), 2), | |
"Price": round(np.random.uniform(10, 500), 2), | |
"NumReviews": np.random.randint(0, 1000) | |
} | |
df = df.append(new_entry, ignore_index=True) | |
df.to_csv("synthetic_marketplace_data_2000.csv", index=False) | |
def generate_dashboard(): | |
df = load_data() | |
lifecycle_fig = px.bar(df.groupby('Category')['LifecycleYears'].mean().reset_index(), | |
x='Category', y='LifecycleYears', title='Average Product Lifecycle by Category') | |
price_trend_fig = px.line(df.groupby('Category')['Price'].mean().reset_index(), | |
x='Category', y='Price', title='Average Price Trends by Category') | |
engagement_fig = px.bar(df.groupby('Category')['NumReviews'].sum().reset_index(), | |
x='Category', y='NumReviews', title='Total User Reviews per Category') | |
df['Sustainability Score'] = np.random.uniform(0, 100, len(df)) | |
sustainability_fig = px.scatter(df, x='Price', y='Sustainability Score', color='Category', | |
title='Sustainability Score vs. Product Price') | |
return lifecycle_fig, price_trend_fig, engagement_fig, sustainability_fig | |
# Gradio Interfaces | |
with gr.Blocks() as app: | |
# Add a logo or banner image | |
gr.Markdown(""" | |
<div style="text-align: center;"> | |
<img src="https://via.placeholder.com/800x200.png?text=Circular+Economy+Marketplace" alt="Banner" style="width: 100%; max-width: 800px;"> | |
</div> | |
""") | |
gr.Markdown("# 🔐 Circular Economy Marketplace") | |
# Login/Register Tab | |
with gr.Tab("Login/Register"): | |
with gr.Tab("Register"): | |
reg_username = gr.Textbox(label="Username") | |
reg_password = gr.Textbox(label="Password", type="password") | |
reg_btn = gr.Button("Register") | |
reg_output = gr.Textbox() | |
reg_btn.click(register, inputs=[reg_username, reg_password], outputs=reg_output) | |
with gr.Tab("Login"): | |
log_username = gr.Textbox(label="Username") | |
log_password = gr.Textbox(label="Password", type="password") | |
log_btn = gr.Button("Login") | |
log_output = gr.Textbox() | |
log_btn.click(login, inputs=[log_username, log_password], outputs=log_output) | |
# Product Lifecycle Prediction Tab | |
with gr.Tab("Product Lifecycle Prediction"): | |
lifecycle_inputs = [ | |
gr.Dropdown(["Plastic", "Metal", "Wood", "Composite", "Electronics"], label="Category"), | |
gr.Textbox(label="Product Name"), | |
gr.Number(label="Price"), | |
gr.Number(label="Rating"), | |
gr.Number(label="NumReviews"), | |
gr.Number(label="StockQuantity"), | |
gr.Number(label="Discount") | |
] | |
lifecycle_output = gr.Textbox(label="Prediction") | |
lifecycle_btn = gr.Button("Predict") | |
lifecycle_btn.click(predict_lifecycle, inputs=lifecycle_inputs, outputs=lifecycle_output) | |
# Dynamic Pricing Tab | |
with gr.Tab("Dynamic Pricing"): | |
pricing_inputs = [ | |
gr.Dropdown(["iPhone 13", "Nike Shoes", "Samsung TV", "Adidas Jacket", "Dell Laptop", "Sony Headphones", "Apple Watch", | |
"LG Refrigerator", "HP Printer", "Bose Speaker"], label="Product Name"), | |
gr.Dropdown(["Electronics", "Fashion", "Home Appliances"], label="Category"), | |
gr.Number(label="Base Price"), | |
gr.Number(label="Competitor Price"), | |
gr.Dropdown(["Low", "Medium", "High"], label="Demand"), | |
gr.Number(label="Stock"), | |
gr.Number(label="Reviews"), | |
gr.Number(label="Rating"), | |
gr.Dropdown(["Holiday", "Summer", "Winter", "Off-season"], label="Season"), | |
gr.Number(label="Discount (%)") | |
] | |
pricing_output = gr.Textbox(label="Predicted Price") | |
pricing_btn = gr.Button("Predict") | |
pricing_btn.click(predict_price, inputs=pricing_inputs, outputs=pricing_output) | |
# Product Recommendation Tab | |
with gr.Tab("Product Recommendation"): | |
recommendation_input = gr.Dropdown(choices=df_recommendation['category'].unique().tolist(), label="Select Product Category") | |
recommendation_output = gr.Dataframe() | |
recommendation_btn = gr.Button("Recommend") | |
recommendation_btn.click(recommend_products, inputs=recommendation_input, outputs=recommendation_output) | |
# Circular Economy Analytics Tab | |
with gr.Tab("Circular Economy Analytics"): | |
dashboard_outputs = [ | |
gr.Plot(label="Product Lifecycle Analytics"), | |
gr.Plot(label="Dynamic Pricing Insights"), | |
gr.Plot(label="User Engagement Trends"), | |
gr.Plot(label="Sustainability & Recycling Insights") | |
] | |
dashboard_btn = gr.Button("Generate Dashboard") | |
dashboard_btn.click(generate_dashboard, inputs=[], outputs=dashboard_outputs) | |
# AI Chatbot Tab | |
with gr.Tab("AI Chatbot"): | |
gr.Markdown(""" | |
<div style="text-align: center;"> | |
<img src="https://via.placeholder.com/400x200.png?text=AI+Chatbot" alt="Chatbot" style="width: 100%; max-width: 400px;"> | |
</div> | |
""") | |
chatbot_input = gr.Textbox(label="Ask me anything about circular economy, product lifecycle, dynamic pricing, and recommendations!") | |
chatbot_output = gr.Textbox(label="AI Response") | |
chatbot_btn = gr.Button("Ask") | |
chatbot_btn.click(huggingface_chatbot, inputs=chatbot_input, outputs=chatbot_output) | |
# Simulate real-time data updates | |
def live_update(): | |
while True: | |
update_live_data() | |
time.sleep(5) | |
threading.Thread(target=live_update, daemon=True).start() | |
# Launch the app | |
app.launch() | |