import gradio as gr import sqlite3 import bcrypt import pandas as pd import numpy as np import joblib from sklearn.ensemble import RandomForestRegressor from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler from sklearn.compose import ColumnTransformer from sklearn.neighbors import NearestNeighbors import plotly.express as px import time import threading import random import requests import os # Import the os module to access environment variables # Hugging Face API configuration HUGGINGFACE_API_URL = "https://api-inference.huggingface.co/models/google/flan-t5-large" HUGGINGFACE_API_KEY = os.environ["HUGGINGFACE_API_KEY"] # Access the API key from environment variables def huggingface_chatbot(user_input): try: headers = { "Authorization": f"Bearer {HUGGINGFACE_API_KEY}", "Content-Type": "application/json" } data = { "inputs": f"Answer the following question clearly and concisely:\n{user_input}", "parameters": { "max_length": 150, "temperature": 0.3 # Less randomness } } response = requests.post(HUGGINGFACE_API_URL, headers=headers, json=data) response.raise_for_status() return response.json()[0]["generated_text"] except Exception as e: return f"Error: {str(e)}" def huggingface_chatbot(user_input): try: headers = { "Authorization": f"Bearer {HUGGINGFACE_API_KEY}", "Content-Type": "application/json" } data = { "inputs": f"Answer the following question clearly and concisely:\n{user_input}", "parameters": { "max_length": 150, "temperature": 0.3 # Less randomness } } response = requests.post(HUGGINGFACE_API_URL, headers=headers, json=data) response.raise_for_status() return response.json()[0]["generated_text"] except Exception as e: return f"Error: {str(e)}" # Hugging Face Chatbot Function def huggingface_chatbot(user_input): try: headers = { "Authorization": f"Bearer {HUGGINGFACE_API_KEY}", "Content-Type": "application/json" } data = { "inputs": user_input, "parameters": { "max_length": 100, # Adjust as needed "temperature": 0.7 # Adjust as needed } } response = requests.post(HUGGINGFACE_API_URL, headers=headers, json=data) response.raise_for_status() # Raise an error for bad status codes return response.json()[0]["generated_text"] except Exception as e: return f"Error: {str(e)}" # Database setup for user authentication def init_db(): conn = sqlite3.connect("users.db") cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE, password TEXT ) """) conn.commit() conn.close() init_db() def register(username, password): conn = sqlite3.connect("users.db") cursor = conn.cursor() hashed_pw = bcrypt.hashpw(password.encode(), bcrypt.gensalt()) try: cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, hashed_pw)) conn.commit() return "✅ Registration Successful! You can now log in." except sqlite3.IntegrityError: return "⚠️ Username already exists. Try another." finally: conn.close() def login(username, password): conn = sqlite3.connect("users.db") cursor = conn.cursor() cursor.execute("SELECT password FROM users WHERE username = ?", (username,)) result = cursor.fetchone() conn.close() if result and bcrypt.checkpw(password.encode(), result[0]): return "✅ Login Successful! Welcome to the marketplace." else: return "❌ Incorrect username or password. Try again." # Load dataset for product lifecycle prediction df_lifecycle = pd.read_csv("ecommerce_product_dataset.csv") # Update this path with the correct one # Preprocessing for product lifecycle prediction categorical_features_lifecycle = ['Category'] numeric_features_lifecycle = ['Price', 'Rating', 'NumReviews', 'StockQuantity', 'Discount'] preprocessor_lifecycle = ColumnTransformer([ ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features_lifecycle), ('num', 'passthrough', numeric_features_lifecycle) ]) # Fit the preprocessor on training data X_lifecycle = df_lifecycle[["Category", "ProductName", "Price", "Rating", "NumReviews", "StockQuantity", "Discount"]] y_lifecycle = df_lifecycle["Sales"] # Target variable X_transformed_lifecycle = preprocessor_lifecycle.fit_transform(X_lifecycle) # Train the model model_lifecycle = RandomForestRegressor(n_estimators=100, random_state=42) model_lifecycle.fit(X_transformed_lifecycle, y_lifecycle) # Save the fitted preprocessor & model joblib.dump(preprocessor_lifecycle, "preprocessor_lifecycle.pkl") joblib.dump(model_lifecycle, "product_lifecycle_model.pkl") # Load trained model and fitted preprocessor model_lifecycle = joblib.load("product_lifecycle_model.pkl") preprocessor_lifecycle = joblib.load("preprocessor_lifecycle.pkl") def preprocess_input_lifecycle(Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount): input_df = pd.DataFrame([[Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount]], columns=["Category", "ProductName", "Price", "Rating", "NumReviews", "StockQuantity", "Discount"]) input_processed = preprocessor_lifecycle.transform(input_df) return input_processed def predict_lifecycle(Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount): try: input_data = preprocess_input_lifecycle(Category, ProductName, Price, Rating, NumReviews, StockQuantity, Discount) prediction = model_lifecycle.predict(input_data)[0] return f"Predicted Product Lifecycle: {round(prediction, 2)} years" except Exception as e: return f"Error: {str(e)}" # Load dataset for dynamic pricing df_pricing = pd.read_csv("dynamic_pricing_data_5000.csv") # Update this path with the correct one # Encode categorical variables for dynamic pricing label_encoders = {} for col in ["Product Name", "Category", "Demand", "Season"]: le = LabelEncoder() df_pricing[col] = le.fit_transform(df_pricing[col]) label_encoders[col] = le # Scale numerical features for dynamic pricing scaler = StandardScaler() num_cols = ["Base Price", "Competitor Price", "Stock", "Reviews", "Rating", "Discount"] df_pricing[num_cols] = scaler.fit_transform(df_pricing[num_cols]) # Save label encoders and scaler joblib.dump(label_encoders, "label_encoders.pkl") joblib.dump(scaler, "scaler.pkl") # Train model for dynamic pricing X_pricing = df_pricing.drop(columns=["Final Price"]) y_pricing = df_pricing["Final Price"] model_pricing = RandomForestRegressor(n_estimators=100, random_state=42) model_pricing.fit(X_pricing, y_pricing) # Save the trained model joblib.dump(model_pricing, "dynamic_pricing_model.pkl") # Load trained model, scaler, and label encoders model_pricing = joblib.load("dynamic_pricing_model.pkl") scaler = joblib.load("scaler.pkl") label_encoders = joblib.load("label_encoders.pkl") def predict_price(product_name, category, base_price, competitor_price, demand, stock, reviews, rating, season, discount): # Encode categorical features category = label_encoders["Category"].transform([category])[0] demand = label_encoders["Demand"].transform([demand])[0] season = label_encoders["Season"].transform([season])[0] product_name = label_encoders["Product Name"].transform([product_name])[0] # Scale numerical features features = np.array([base_price, competitor_price, stock, reviews, rating, discount]).reshape(1, -1) features = scaler.transform(features) # Combine features final_features = np.concatenate((features.flatten(), [category, demand, season, product_name])).reshape(1, -1) # Predict predicted_price = model_pricing.predict(final_features)[0] return f"Optimal Price: ₹{round(predicted_price, 2)}" # Load dataset for product recommendation df_recommendation = pd.read_csv("synthetic_product_data_5000.csv") # Update this path with the correct one # Preprocessing for product recommendation categorical_features_recommendation = ['product_condition', 'category'] numeric_features_recommendation = ['price'] preprocessor_recommendation = ColumnTransformer( transformers=[ ('cat', OneHotEncoder(), categorical_features_recommendation), ('num', 'passthrough', numeric_features_recommendation) ]) product_features = preprocessor_recommendation.fit_transform(df_recommendation[['product_condition', 'price', 'category']]) # Fit NearestNeighbors model knn = NearestNeighbors(n_neighbors=5) knn.fit(product_features) def recommend_products(category): filtered_df = df_recommendation[df_recommendation['category'] == category] if filtered_df.empty: return "No products found in this category." random_product = random.choice(filtered_df.index) product = product_features[random_product].reshape(1, -1) _, indices = knn.kneighbors(product) recommended = df_recommendation.iloc[indices[0]] recommended = recommended[recommended['category'] == category] return recommended[['product_id', 'product_condition', 'price', 'category']] # Circular Economy Analytics Dashboard def load_data(): return pd.read_csv("synthetic_marketplace_data_2000.csv") def update_live_data(): df = load_data() new_entry = { "Category": np.random.choice(["Electronics", "Plastic", "Metal", "Wood", "Composite"]), "LifecycleYears": round(np.random.uniform(1, 20), 2), "Price": round(np.random.uniform(10, 500), 2), "NumReviews": np.random.randint(0, 1000) } df = df.append(new_entry, ignore_index=True) df.to_csv("synthetic_marketplace_data_2000.csv", index=False) def generate_dashboard(): df = load_data() lifecycle_fig = px.bar(df.groupby('Category')['LifecycleYears'].mean().reset_index(), x='Category', y='LifecycleYears', title='Average Product Lifecycle by Category') price_trend_fig = px.line(df.groupby('Category')['Price'].mean().reset_index(), x='Category', y='Price', title='Average Price Trends by Category') engagement_fig = px.bar(df.groupby('Category')['NumReviews'].sum().reset_index(), x='Category', y='NumReviews', title='Total User Reviews per Category') df['Sustainability Score'] = np.random.uniform(0, 100, len(df)) sustainability_fig = px.scatter(df, x='Price', y='Sustainability Score', color='Category', title='Sustainability Score vs. Product Price') return lifecycle_fig, price_trend_fig, engagement_fig, sustainability_fig # Gradio Interfaces with gr.Blocks() as app: # Add a logo or banner image gr.Markdown("""