Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
import json | |
import re | |
import random | |
from typing import List, Dict, Tuple, Optional | |
import os | |
import time | |
import matplotlib.pyplot as plt | |
from io import BytesIO | |
import base64 | |
from datetime import datetime | |
# Assuming all the classes (ActivationFunctions, LossFunctions, Layer, DenseLayer, | |
# DropoutLayer, NeuralNetwork, TextProcessor, Chatbot) are defined as in your uploaded code | |
# I'm not repeating them here for brevity | |
class ActivationFunctions: | |
"""Class containing various activation functions and their derivatives.""" | |
def sigmoid(z: np.ndarray) -> np.ndarray: | |
"""Sigmoid activation function.""" | |
z = np.clip(z, -500, 500) | |
return 1 / (1 + np.exp(-z)) | |
def sigmoid_derivative(z: np.ndarray) -> np.ndarray: | |
"""Derivative of the sigmoid function.""" | |
s = ActivationFunctions.sigmoid(z) | |
return s * (1 - s) | |
def relu(z: np.ndarray) -> np.ndarray: | |
"""ReLU activation function.""" | |
return np.maximum(0, z) | |
def relu_derivative(z: np.ndarray) -> np.ndarray: | |
"""Derivative of the ReLU function.""" | |
return np.where(z > 0, 1, 0) | |
def softmax(z: np.ndarray) -> np.ndarray: | |
"""Softmax activation function.""" | |
exp_z = np.exp(z - np.max(z)) | |
return exp_z / exp_z.sum(axis=0, keepdims=True) | |
class LossFunctions: | |
"""Class containing various loss functions and their derivatives.""" | |
def mse(output: np.ndarray, target: np.ndarray) -> float: | |
"""Mean Squared Error loss.""" | |
return np.mean((output - target) ** 2) | |
def mse_derivative(output: np.ndarray, target: np.ndarray) -> np.ndarray: | |
"""Derivative of MSE loss.""" | |
return 2 * (output - target) / output.size | |
def cross_entropy(output: np.ndarray, target: np.ndarray) -> float: | |
"""Cross Entropy loss for multi-class classification.""" | |
epsilon = 1e-15 | |
output = np.clip(output, epsilon, 1 - epsilon) | |
return -np.sum(target * np.log(output)) / output.shape[1] | |
def cross_entropy_derivative(output: np.ndarray, target: np.ndarray) -> np.ndarray: | |
"""Derivative of Cross Entropy loss.""" | |
epsilon = 1e-15 | |
output = np.clip(output, epsilon, 1 - epsilon) | |
return -target / output / output.shape[1] | |
class Layer: | |
"""Base class for neural network layers.""" | |
def forward(self, inputs: np.ndarray) -> np.ndarray: | |
"""Forward pass through the layer.""" | |
raise NotImplementedError | |
def backward(self, grad: np.ndarray) -> np.ndarray: | |
"""Backward pass through the layer.""" | |
raise NotImplementedError | |
def update(self, learning_rate: float) -> None: | |
"""Update layer parameters.""" | |
pass | |
def get_parameters(self) -> List: | |
"""Get layer parameters.""" | |
return [] | |
class DenseLayer(Layer): | |
"""Fully connected layer with improved numerical stability.""" | |
def __init__(self, input_size: int, output_size: int, activation: str = "sigmoid"): | |
"""Initialize the dense layer with more stable parameters.""" | |
self.input_size = input_size | |
self.output_size = output_size | |
# Use smaller initialization to prevent exploding gradients | |
# Xavier/Glorot initialization with smaller scale factor | |
self.weights = np.random.randn(output_size, input_size) * np.sqrt( | |
1 / (input_size + output_size) | |
) | |
self.biases = np.zeros((output_size, 1)) | |
# Set activation function | |
if activation == "sigmoid": | |
self.activation_fn = ActivationFunctions.sigmoid | |
self.activation_derivative = ActivationFunctions.sigmoid_derivative | |
elif activation == "relu": | |
self.activation_fn = ActivationFunctions.relu | |
self.activation_derivative = ActivationFunctions.relu_derivative | |
elif activation == "softmax": | |
self.activation_fn = ActivationFunctions.softmax | |
self.activation_derivative = None | |
else: | |
raise ValueError(f"Unsupported activation function: {activation}") | |
self.activation_name = activation | |
# Cache for backward pass | |
self.inputs = None | |
self.z = None | |
self.output = None | |
# Gradients | |
self.dW = None | |
self.db = None | |
def forward(self, inputs: np.ndarray) -> np.ndarray: | |
"""Forward pass through the layer with improved numerical stability.""" | |
self.inputs = inputs | |
# Use dot product with better numerical stability | |
self.z = np.dot(self.weights, inputs) + self.biases | |
# Clip values to prevent overflow in activations | |
if self.activation_name == "sigmoid": | |
self.z = np.clip(self.z, -15, 15) # Prevent overflow in sigmoid | |
self.output = self.activation_fn(self.z) | |
# Add small epsilon to prevent exact zeros or ones | |
if self.activation_name == "softmax": | |
epsilon = 1e-10 | |
self.output = np.clip(self.output, epsilon, 1.0 - epsilon) | |
return self.output | |
def backward(self, grad: np.ndarray) -> np.ndarray: | |
"""Backward pass through the layer with gradient clipping.""" | |
if self.activation_name == "softmax": | |
# Special case for softmax + cross-entropy | |
delta = grad | |
else: | |
delta = grad * self.activation_derivative(self.z) | |
# Compute gradients | |
self.dW = np.dot(delta, self.inputs.T) | |
self.db = np.sum(delta, axis=1, keepdims=True) | |
# Clip gradients to prevent exploding gradients | |
max_grad_norm = 5.0 | |
self.dW = np.clip(self.dW, -max_grad_norm, max_grad_norm) | |
self.db = np.clip(self.db, -max_grad_norm, max_grad_norm) | |
# Gradient to pass to the previous layer | |
return np.dot(self.weights.T, delta) | |
def update(self, learning_rate: float) -> None: | |
"""Update layer parameters using gradient descent with weight decay.""" | |
# Add small weight decay to prevent overfitting | |
weight_decay = 1e-4 | |
weight_decay_term = weight_decay * self.weights | |
self.weights -= learning_rate * (self.dW + weight_decay_term) | |
self.biases -= learning_rate * self.db | |
class DropoutLayer(Layer): | |
"""Dropout layer for regularization.""" | |
def __init__(self, dropout_rate: float = 0.5): | |
"""Initialize the dropout layer.""" | |
self.dropout_rate = dropout_rate | |
self.mask = None | |
def forward(self, inputs: np.ndarray, training: bool = True) -> np.ndarray: | |
"""Forward pass through the layer.""" | |
if not training: | |
return inputs | |
# Create dropout mask | |
self.mask = np.random.binomial(1, 1 - self.dropout_rate, size=inputs.shape) / ( | |
1 - self.dropout_rate | |
) | |
return inputs * self.mask | |
def backward(self, grad: np.ndarray) -> np.ndarray: | |
"""Backward pass through the layer.""" | |
return grad * self.mask | |
class NeuralNetwork: | |
"""Neural network with multiple layers.""" | |
def __init__(self): | |
"""Initialize the neural network.""" | |
self.layers = [] | |
self.loss_fn = None | |
self.loss_derivative = None | |
def add(self, layer: Layer) -> None: | |
"""Add a layer to the network.""" | |
self.layers.append(layer) | |
def set_loss(self, loss_type: str) -> None: | |
"""Set the loss function.""" | |
if loss_type == "mse": | |
self.loss_fn = LossFunctions.mse | |
self.loss_derivative = LossFunctions.mse_derivative | |
elif loss_type == "cross_entropy": | |
self.loss_fn = LossFunctions.cross_entropy | |
self.loss_derivative = LossFunctions.cross_entropy_derivative | |
else: | |
raise ValueError(f"Unsupported loss function: {loss_type}") | |
def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray: | |
"""Forward pass through the network.""" | |
output = x | |
for layer in self.layers: | |
if isinstance(layer, DropoutLayer): | |
output = layer.forward(output, training) | |
else: | |
output = layer.forward(output) | |
return output | |
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float: | |
"""Compute the loss.""" | |
return self.loss_fn(y_pred, y_true) | |
def backward(self, y_pred: np.ndarray, y_true: np.ndarray) -> None: | |
"""Backward pass through the network.""" | |
# Initial gradient from the loss function | |
grad = self.loss_derivative(y_pred, y_true) | |
# Propagate gradient through layers in reverse order | |
for layer in reversed(self.layers): | |
grad = layer.backward(grad) | |
def update(self, learning_rate: float) -> None: | |
"""Update network parameters.""" | |
for layer in self.layers: | |
layer.update(learning_rate) | |
def predict(self, x: np.ndarray) -> np.ndarray: | |
"""Make predictions.""" | |
return self.forward(x, training=False) | |
def load(cls, filename: str) -> "NeuralNetwork": | |
"""Load a model from a file.""" | |
with open(filename, "r") as f: | |
model_data = json.load(f) | |
network = cls() | |
network.set_loss(model_data.get("loss_type", "cross_entropy")) | |
for layer_data in model_data["layers"]: | |
if layer_data["type"] == "dense": | |
layer = DenseLayer( | |
layer_data["input_size"], | |
layer_data["output_size"], | |
layer_data["activation"], | |
) | |
layer.weights = np.array(layer_data["weights"]) | |
layer.biases = np.array(layer_data["biases"]) | |
network.add(layer) | |
elif layer_data["type"] == "dropout": | |
layer = DropoutLayer(layer_data["dropout_rate"]) | |
network.add(layer) | |
return network | |
def save(self, filename: str) -> None: | |
"""Save the model to a file.""" | |
model_data = {"layers": []} | |
for layer in self.layers: | |
if isinstance(layer, DenseLayer): | |
layer_data = { | |
"type": "dense", | |
"input_size": layer.input_size, | |
"output_size": layer.output_size, | |
"activation": layer.activation_name, | |
"weights": layer.weights.tolist(), | |
"biases": layer.biases.tolist(), | |
} | |
model_data["layers"].append(layer_data) | |
elif isinstance(layer, DropoutLayer): | |
layer_data = {"type": "dropout", "dropout_rate": layer.dropout_rate} | |
model_data["layers"].append(layer_data) | |
with open(filename, "w") as f: | |
json.dump(model_data, f) | |
class TextProcessor: | |
"""Class for processing text data.""" | |
def __init__(self): | |
"""Initialize the text processor.""" | |
self.vocabulary = [] | |
self.vocabulary_size = 0 | |
def tokenize(self, sentence: str) -> List[str]: | |
"""Tokenize a sentence.""" | |
return re.findall(r"\w+", sentence.lower()) | |
def build_vocabulary(self, sentences: List[str]) -> None: | |
"""Build the vocabulary from a list of sentences.""" | |
vocabulary = set() | |
for sentence in sentences: | |
tokens = self.tokenize(sentence) | |
vocabulary.update(tokens) | |
self.vocabulary = sorted(list(vocabulary)) | |
self.vocabulary_size = len(self.vocabulary) | |
def sentence_to_bow(self, sentence: str) -> np.ndarray: | |
"""Convert a sentence to a bag-of-words vector.""" | |
tokens = self.tokenize(sentence) | |
vector = np.zeros((self.vocabulary_size, 1)) | |
for token in tokens: | |
if token in self.vocabulary: | |
idx = self.vocabulary.index(token) | |
vector[idx, 0] = 1 | |
return vector | |
def save(self, filename: str) -> None: | |
"""Save the text processor to a file.""" | |
processor_data = { | |
"vocabulary": self.vocabulary, | |
"vocabulary_size": self.vocabulary_size, | |
} | |
with open(filename, "w") as f: | |
json.dump(processor_data, f) | |
def load(cls, filename: str) -> "TextProcessor": | |
"""Load a text processor from a file.""" | |
with open(filename, "r") as f: | |
processor_data = json.load(f) | |
processor = cls() | |
processor.vocabulary = processor_data["vocabulary"] | |
processor.vocabulary_size = processor_data["vocabulary_size"] | |
return processor | |
class Chatbot: | |
"""Neural network based chatbot.""" | |
def __init__(self): | |
"""Initialize the chatbot.""" | |
self.intents = {} | |
self.text_processor = TextProcessor() | |
self.model = NeuralNetwork() | |
self.intent_names = [] | |
self.confidence_threshold = 0.5 | |
self.default_response = "I'm not sure I understand. Could you rephrase that?" | |
self.training_history = None | |
def load_intents(self, intents_data: Dict) -> None: | |
"""Load intents data.""" | |
self.intents = intents_data | |
self.intent_names = list(self.intents.keys()) | |
# Extract all patterns for building vocabulary | |
all_patterns = [] | |
for intent in self.intents.values(): | |
all_patterns.extend(intent["patterns"]) | |
# Build vocabulary from patterns | |
self.text_processor.build_vocabulary(all_patterns) | |
def load_intents_from_file(self, filename: str) -> None: | |
"""Load intents from a JSON file.""" | |
with open(filename, "r") as f: | |
intents_data = json.load(f) | |
self.load_intents(intents_data) | |
def save_intents(self, filename: str) -> None: | |
"""Save intents to a JSON file.""" | |
with open(filename, "w") as f: | |
json.dump(self.intents, f, indent=4) | |
def load_model(self, filename: str) -> None: | |
"""Load a model from a file.""" | |
self.model = NeuralNetwork.load(filename) | |
def save_model(self, filename: str) -> None: | |
"""Save the model to a file.""" | |
self.model.save(filename) | |
# Also save the text processor and intent names | |
self.text_processor.save(filename.replace(".json", "_processor.json")) | |
# Save intent names | |
with open(filename.replace(".json", "_intents.json"), "w") as f: | |
json.dump( | |
{ | |
"intent_names": self.intent_names, | |
"confidence_threshold": self.confidence_threshold, | |
"default_response": self.default_response, | |
}, | |
f, | |
) | |
def build_model( | |
self, hidden_layers: List[int] = [8], dropout_rate: float = 0.0 | |
) -> None: | |
"""Build the neural network model.""" | |
# Input layer size is the vocabulary size | |
input_size = self.text_processor.vocabulary_size | |
# Output layer size is the number of intents | |
output_size = len(self.intent_names) | |
if output_size == 0: | |
raise ValueError("No intents loaded. Please load intents first.") | |
# Create the model | |
self.model = NeuralNetwork() | |
# Add first hidden layer | |
self.model.add(DenseLayer(input_size, hidden_layers[0], "relu")) | |
# Add dropout if needed | |
if dropout_rate > 0: | |
self.model.add(DropoutLayer(dropout_rate)) | |
# Add additional hidden layers | |
for i in range(1, len(hidden_layers)): | |
self.model.add(DenseLayer(hidden_layers[i - 1], hidden_layers[i], "relu")) | |
# Add dropout if needed | |
if dropout_rate > 0: | |
self.model.add(DropoutLayer(dropout_rate)) | |
# Add output layer with softmax activation for classification | |
self.model.add(DenseLayer(hidden_layers[-1], output_size, "softmax")) | |
# Set cross-entropy loss for classification | |
self.model.set_loss("cross_entropy") | |
def train( | |
self, | |
epochs: int = 1000, | |
learning_rate: float = 0.01, | |
batch_size: int = None, | |
verbose: bool = True, | |
) -> Dict: | |
"""Train the model with numerical stability fixes.""" | |
# Prepare training data | |
X_train = [] | |
y_train = [] | |
for idx, intent in enumerate(self.intent_names): | |
for pattern in self.intents[intent]["patterns"]: | |
# Convert pattern to bag-of-words | |
X_train.append(self.text_processor.sentence_to_bow(pattern)) | |
# Create one-hot encoded target | |
target = np.zeros((len(self.intent_names), 1)) | |
target[idx, 0] = 1 | |
y_train.append(target) | |
# Convert to numpy arrays | |
X_train = np.hstack(X_train) | |
y_train = np.hstack(y_train) | |
# Training history | |
history = {"loss": [], "accuracy": []} | |
# Apply gradient clipping to prevent exploding gradients | |
max_grad_norm = 1.0 | |
# Training loop | |
for epoch in range(epochs): | |
# Forward pass | |
outputs = self.model.forward(X_train) | |
# Add small epsilon to prevent log(0) | |
epsilon = 1e-10 | |
outputs = np.clip(outputs, epsilon, 1.0 - epsilon) | |
# Compute loss | |
loss = self.model.compute_loss(outputs, y_train) | |
# Check for NaN and if found, break training | |
if np.isnan(loss): | |
if verbose: | |
print(f"NaN loss detected at epoch {epoch+1}. Stopping training.") | |
# If we have previous good values, use those | |
if epoch > 0: | |
break | |
else: | |
# Otherwise, return with error | |
return {"loss": [0], "accuracy": [0]} | |
# Backward pass | |
self.model.backward(outputs, y_train) | |
# Apply gradient clipping to each layer | |
for layer in self.model.layers: | |
if hasattr(layer, "dW") and layer.dW is not None: | |
# Clip gradients | |
layer.dW = np.clip(layer.dW, -max_grad_norm, max_grad_norm) | |
if hasattr(layer, "db") and layer.db is not None: | |
layer.db = np.clip(layer.db, -max_grad_norm, max_grad_norm) | |
# Update parameters | |
self.model.update(learning_rate) | |
# Compute accuracy | |
predictions = np.argmax(outputs, axis=0) | |
targets = np.argmax(y_train, axis=0) | |
accuracy = np.mean(predictions == targets) | |
# Save history | |
history["loss"].append( | |
float(loss) | |
) # Convert to Python float to ensure it's serializable | |
history["accuracy"].append(float(accuracy)) | |
# Print progress | |
if verbose and (epoch + 1) % 100 == 0: | |
print( | |
f"Epoch {epoch + 1}/{epochs}, Loss: {loss:.4f}, Accuracy: {accuracy:.4f}" | |
) | |
self.training_history = history | |
return history | |
def predict(self, sentence: str) -> Tuple[str, float]: | |
"""Predict the intent of a sentence.""" | |
# Convert to bag-of-words | |
bow = self.text_processor.sentence_to_bow(sentence) | |
# Get prediction | |
prediction = self.model.predict(bow) | |
# Get predicted intent and confidence | |
intent_idx = np.argmax(prediction) | |
confidence = prediction[intent_idx, 0] | |
return self.intent_names[intent_idx], confidence | |
def get_response(self, sentence: str) -> Tuple[str, str, float]: | |
"""Get a response for a user input.""" | |
intent, confidence = self.predict(sentence) | |
# Use default response if confidence is below threshold | |
if confidence < self.confidence_threshold: | |
return "unknown", self.default_response, confidence | |
# Get a random response for the predicted intent | |
responses = self.intents[intent]["responses"] | |
response = random.choice(responses) | |
return intent, response, confidence | |
def plot_training_history(self, history: Dict = None) -> None: | |
"""Plot the training history.""" | |
if history is None: | |
history = self.training_history | |
if history is None: | |
print("No training history available.") | |
return | |
plt.figure(figsize=(12, 5)) | |
plt.subplot(1, 2, 1) | |
plt.plot(history["loss"]) | |
plt.title("Model Loss") | |
plt.xlabel("Epoch") | |
plt.ylabel("Loss") | |
plt.subplot(1, 2, 2) | |
plt.plot(history["accuracy"]) | |
plt.title("Model Accuracy") | |
plt.xlabel("Epoch") | |
plt.ylabel("Accuracy") | |
plt.tight_layout() | |
plt.show() | |
def get_training_plot_as_base64(self, history: Dict = None) -> str: | |
"""Generate a base64 encoded image of the training history plot with improved error handling.""" | |
if history is None: | |
history = self.training_history | |
if history is None or "loss" not in history or len(history["loss"]) == 0: | |
return None | |
try: | |
plt.figure(figsize=(12, 5)) | |
# Check for NaN values and filter them out | |
loss_values = [x for x in history["loss"] if not np.isnan(x)] | |
acc_values = [x for x in history["accuracy"] if not np.isnan(x)] | |
if len(loss_values) == 0 or len(acc_values) == 0: | |
return None | |
# Plot loss (with error handling) | |
plt.subplot(1, 2, 1) | |
plt.plot(loss_values) | |
plt.title("Model Loss") | |
plt.xlabel("Epoch") | |
plt.ylabel("Loss") | |
# Plot accuracy (with error handling) | |
plt.subplot(1, 2, 2) | |
plt.plot(acc_values) | |
plt.title("Model Accuracy") | |
plt.xlabel("Epoch") | |
plt.ylabel("Accuracy") | |
plt.tight_layout() | |
# Save plot to a BytesIO object | |
buf = BytesIO() | |
plt.savefig(buf, format="png") | |
buf.seek(0) | |
# Encode to base64 | |
img_str = base64.b64encode(buf.read()).decode("utf-8") | |
plt.close() | |
# Save the image to a file instead of returning the base64 string directly | |
# This avoids the file name too long error | |
img_path = "training_plot.png" | |
with open(img_path, "wb") as f: | |
f.write(base64.b64decode(img_str)) | |
return img_path | |
except Exception as e: | |
print(f"Error generating training plot: {str(e)}") | |
return None | |
def chat(self): | |
"""Start a chat session in the console.""" | |
print("Chatbot: Hello! Type 'quit' to exit.") | |
while True: | |
user_input = input("You: ") | |
if user_input.lower() in ["quit", "exit", "bye"]: | |
print("Chatbot: Goodbye!") | |
break | |
intent, response, confidence = self.get_response(user_input) | |
print(f"Chatbot ({intent}, {confidence:.2f}): {response}") | |
# Initialize the chatbot | |
chatbot = Chatbot() | |
# Default intents | |
default_intents = { | |
"greeting": { | |
"patterns": ["Hi", "Hello", "Hey", "Good morning", "What's up"], | |
"responses": ["Hello!", "Hi there!", "Greetings!", "Hey! How can I help you?"], | |
}, | |
"farewell": { | |
"patterns": ["Bye", "See you", "Goodbye", "Later", "I'm leaving"], | |
"responses": ["Goodbye!", "See you later!", "Farewell!", "Take care!"], | |
}, | |
"thanks": { | |
"patterns": ["Thanks", "Thank you", "Much appreciated", "Appreciate it"], | |
"responses": ["You're welcome!", "No problem!", "Anytime!", "Glad to help!"], | |
}, | |
"help": { | |
"patterns": ["Help", "I need help", "Can you help me", "Support"], | |
"responses": [ | |
"How can I help you?", | |
"I'm here to assist you.", | |
"What do you need help with?", | |
], | |
}, | |
} | |
# Function to initialize the chatbot | |
def initialize_chatbot(): | |
global chatbot | |
# Check if model exists | |
model_path = "chatbot_model.json" | |
processor_path = "chatbot_model_processor.json" | |
intents_names_path = "chatbot_model_intents.json" | |
intents_path = "intents.json" | |
# Check if intents file exists | |
if os.path.exists(intents_path): | |
try: | |
chatbot.load_intents_from_file(intents_path) | |
print(f"Loaded intents from {intents_path}") | |
except Exception as e: | |
print(f"Error loading intents: {e}") | |
print("Loading default intents") | |
chatbot.load_intents(default_intents) | |
else: | |
print("No intents file found. Loading default intents") | |
chatbot.load_intents(default_intents) | |
# Save default intents | |
chatbot.save_intents(intents_path) | |
# Check if all model files exist | |
if ( | |
os.path.exists(model_path) | |
and os.path.exists(processor_path) | |
and os.path.exists(intents_names_path) | |
): | |
try: | |
# Load the model | |
chatbot.load_model(model_path) | |
# Load the text processor | |
chatbot.text_processor = TextProcessor.load(processor_path) | |
# Load intent names and settings | |
with open(intents_names_path, "r") as f: | |
intents_data = json.load(f) | |
chatbot.intent_names = intents_data["intent_names"] | |
chatbot.confidence_threshold = intents_data.get( | |
"confidence_threshold", 0.5 | |
) | |
chatbot.default_response = intents_data.get( | |
"default_response", | |
"I'm not sure I understand. Could you rephrase that?", | |
) | |
print(f"Loaded existing model from {model_path}") | |
except Exception as e: | |
print(f"Error loading model: {e}") | |
print("A new model will be built and trained") | |
chatbot.build_model(hidden_layers=[32, 16]) | |
else: | |
print( | |
"No model found or incomplete model files. A new model will be built and trained" | |
) | |
chatbot.build_model(hidden_layers=[32, 16]) | |
# Call initialize | |
initialize_chatbot() | |
# Chat history for the interface | |
chat_history = [] | |
# Function to respond to user messages | |
def respond(message, history): | |
if not message: | |
return "Please type a message." | |
# Get response from chatbot | |
intent, response, confidence = chatbot.get_response(message) | |
# Add thinking animation (simulate processing) | |
time.sleep(0.5) | |
# Return the response | |
return response | |
# Function to get intent and confidence | |
def get_intent_info(message): | |
if not message: | |
return "N/A", 0.0 | |
# Get intent and confidence | |
intent, confidence = chatbot.predict(message) | |
return intent, float(confidence) | |
# Function to add a new intent | |
def add_intent(intent_name, patterns, responses): | |
if not intent_name or not patterns or not responses: | |
return "Please fill all fields" | |
# Split patterns and responses | |
pattern_list = [p.strip() for p in patterns.split("\n") if p.strip()] | |
response_list = [r.strip() for r in responses.split("\n") if r.strip()] | |
if not pattern_list or not response_list: | |
return "Please provide at least one pattern and one response" | |
# Check if intent already exists | |
if intent_name in chatbot.intents: | |
# Update existing intent | |
chatbot.intents[intent_name]["patterns"].extend(pattern_list) | |
chatbot.intents[intent_name]["responses"].extend(response_list) | |
else: | |
# Add new intent | |
chatbot.intents[intent_name] = { | |
"patterns": pattern_list, | |
"responses": response_list, | |
} | |
chatbot.intent_names.append(intent_name) | |
# Save intents | |
chatbot.save_intents("intents.json") | |
return f"Intent '{intent_name}' added/updated successfully" | |
# Fixed train_model function with corrected format string | |
def train_model(epochs, learning_rate, hidden_layers_str, dropout_rate): | |
try: | |
# Parse hidden layers | |
hidden_layers = [ | |
int(x.strip()) for x in hidden_layers_str.split(",") if x.strip() | |
] | |
if not hidden_layers: | |
return ( | |
"Error: Invalid hidden layer format. Use comma-separated numbers, e.g. '32,16'", | |
None, | |
) | |
# Convert to float/int and use lower learning rate for stability | |
epochs = int(epochs) | |
learning_rate = min( | |
float(learning_rate), 0.005 | |
) # Cap learning rate for stability | |
dropout_rate = float(dropout_rate) | |
# Validate intents and vocabulary | |
if len(chatbot.intent_names) < 2: | |
return ( | |
"Error: Need at least 2 intents for training. Please add more intents.", | |
None, | |
) | |
if chatbot.text_processor.vocabulary_size == 0: | |
return ( | |
"Error: No vocabulary built. Please add more patterns to your intents.", | |
None, | |
) | |
# Rebuild model with new architecture | |
chatbot.build_model(hidden_layers=hidden_layers, dropout_rate=dropout_rate) | |
# Train the model | |
history = chatbot.train( | |
epochs=epochs, learning_rate=learning_rate, verbose=True | |
) | |
# Check if training was successful | |
if not history or "loss" not in history or not history["loss"]: | |
return "Training failed - no history data returned", None | |
# Format final loss and accuracy safely | |
final_loss = history["loss"][-1] if history["loss"] else 0 | |
final_accuracy = history["accuracy"][-1] if history["accuracy"] else 0 | |
if np.isnan(final_loss): | |
final_loss_str = "NaN" | |
else: | |
final_loss_str = f"{final_loss:.4f}" | |
if np.isnan(final_accuracy): | |
final_accuracy_str = "NaN" | |
else: | |
final_accuracy_str = f"{final_accuracy:.4f}" | |
# Save the model | |
chatbot.save_model("chatbot_model.json") | |
# Generate plot image | |
img_str = chatbot.get_training_plot_as_base64(history) | |
return ( | |
f"Model trained successfully with:\n" | |
f"- Epochs: {epochs}\n" | |
f"- Learning Rate: {learning_rate}\n" | |
f"- Hidden Layers: {hidden_layers}\n" | |
f"- Dropout Rate: {dropout_rate}\n" | |
f"- Final Loss: {final_loss_str}\n" | |
f"- Final Accuracy: {final_accuracy_str}" | |
), img_str | |
except Exception as e: | |
import traceback | |
error_details = traceback.format_exc() | |
return f"Error training model: {str(e)}\n\nDetails:\n{error_details}", None | |
# Function to load an existing model | |
def load_model_from_file(file_obj): | |
if not file_obj: | |
return "No file uploaded" | |
try: | |
file_path = file_obj.name | |
# Check file extension | |
if not file_path.endswith(".json"): | |
return "Please upload a JSON model file" | |
# Load the model | |
chatbot.load_model(file_path) | |
# Get the base name without extension for related files | |
base_name = os.path.splitext(file_path)[0] | |
processor_path = f"{base_name}_processor.json" | |
intents_names_path = f"{base_name}_intents.json" | |
# Check for related files | |
if os.path.exists(processor_path): | |
chatbot.text_processor = TextProcessor.load(processor_path) | |
if os.path.exists(intents_names_path): | |
with open(intents_names_path, "r") as f: | |
intents_data = json.load(f) | |
chatbot.intent_names = intents_data["intent_names"] | |
chatbot.confidence_threshold = intents_data.get( | |
"confidence_threshold", 0.5 | |
) | |
chatbot.default_response = intents_data.get( | |
"default_response", | |
"I'm not sure I understand. Could you rephrase that?", | |
) | |
return f"Model loaded successfully from {file_path}" | |
except Exception as e: | |
return f"Error loading model: {str(e)}" | |
# Function to save the current model | |
def save_model(): | |
try: | |
# Get timestamp for filename | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"chatbot_model_{timestamp}.json" | |
# Save the model | |
chatbot.save_model(filename) | |
return f"Model saved as {filename}" | |
except Exception as e: | |
return f"Error saving model: {str(e)}" | |
# Function to update settings | |
def update_settings(threshold, default_response): | |
try: | |
# Update settings | |
chatbot.confidence_threshold = float(threshold) | |
chatbot.default_response = default_response | |
# Save settings to the model intents file | |
with open("chatbot_model_intents.json", "w") as f: | |
json.dump( | |
{ | |
"intent_names": chatbot.intent_names, | |
"confidence_threshold": chatbot.confidence_threshold, | |
"default_response": chatbot.default_response, | |
}, | |
f, | |
) | |
return "Settings updated successfully" | |
except Exception as e: | |
return f"Error updating settings: {str(e)}" | |
# Function to list intents | |
def list_intents(): | |
if not chatbot.intents: | |
return "No intents available" | |
intents_info = "" | |
for intent_name, intent_data in chatbot.intents.items(): | |
patterns = ", ".join(intent_data["patterns"][:3]) | |
if len(intent_data["patterns"]) > 3: | |
patterns += "..." | |
responses = ", ".join(intent_data["responses"][:3]) | |
if len(intent_data["responses"]) > 3: | |
responses += "..." | |
intents_info += f"**Intent**: {intent_name}\n" | |
intents_info += f"**Patterns**: {patterns}\n" | |
intents_info += f"**Responses**: {responses}\n\n" | |
return intents_info | |
# Function to edit an intent | |
def edit_intent(intent_name, new_patterns, new_responses): | |
if not intent_name or intent_name not in chatbot.intents: | |
return f"Intent '{intent_name}' not found" | |
# Split patterns and responses | |
if new_patterns: | |
pattern_list = [p.strip() for p in new_patterns.split("\n") if p.strip()] | |
if pattern_list: | |
chatbot.intents[intent_name]["patterns"] = pattern_list | |
if new_responses: | |
response_list = [r.strip() for r in new_responses.split("\n") if r.strip()] | |
if response_list: | |
chatbot.intents[intent_name]["responses"] = response_list | |
# Save intents | |
chatbot.save_intents("intents.json") | |
return f"Intent '{intent_name}' updated successfully" | |
# Function to delete an intent | |
def delete_intent(intent_name): | |
if not intent_name or intent_name not in chatbot.intents: | |
return f"Intent '{intent_name}' not found" | |
# Delete intent | |
del chatbot.intents[intent_name] | |
chatbot.intent_names.remove(intent_name) | |
# Save intents | |
chatbot.save_intents("intents.json") | |
return f"Intent '{intent_name}' deleted successfully" | |
# Get the list of intents for dropdown | |
def get_intent_list(): | |
return chatbot.intent_names | |
# Function to export intents | |
def export_intents(): | |
try: | |
# Get timestamp for filename | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"intents_{timestamp}.json" | |
# Save intents | |
with open(filename, "w") as f: | |
json.dump(chatbot.intents, f, indent=4) | |
return f"Intents exported as {filename}" | |
except Exception as e: | |
return f"Error exporting intents: {str(e)}" | |
# Function to import intents | |
def import_intents_from_file(file_obj): | |
if not file_obj: | |
return "No file uploaded" | |
try: | |
file_path = file_obj.name | |
# Check file extension | |
if not file_path.endswith(".json"): | |
return "Please upload a JSON intents file" | |
# Load intents | |
with open(file_path, "r") as f: | |
intents_data = json.load(f) | |
# Validate intents format | |
for intent_name, intent_data in intents_data.items(): | |
if ( | |
not isinstance(intent_data, dict) | |
or "patterns" not in intent_data | |
or "responses" not in intent_data | |
): | |
return f"Invalid intent format for '{intent_name}'" | |
# Update chatbot intents | |
chatbot.load_intents(intents_data) | |
# Save intents | |
chatbot.save_intents("intents.json") | |
return f"Imported {len(intents_data)} intents from {file_path}" | |
except Exception as e: | |
return f"Error importing intents: {str(e)}" | |
# Function to get intent details | |
def get_intent_details(intent_name): | |
if not intent_name or intent_name not in chatbot.intents: | |
return "", "" | |
patterns = "\n".join(chatbot.intents[intent_name]["patterns"]) | |
responses = "\n".join(chatbot.intents[intent_name]["responses"]) | |
return patterns, responses | |
# Create the Gradio interface with multiple tabs | |
with gr.Blocks(title="Neural Network Chatbot", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# 🤖 Neural Network Chatbot") | |
gr.Markdown( | |
""" This chatbot uses a neural network to understand and respond to your messages. | |
This chatbot application was developed by: | |
| **Name** | **Student ID** | **Email** | | |
|----------|----------------|-----------| | |
| AARJEYAN SHRESTHA | C0927422 | [email protected] | | |
| PRAJWAL LUITEL | C0927658 | [email protected] | | |
| RAJAN GHIMIRE | C0924991 | [email protected] | | |
| RISHABH JHA | C0923563 | [email protected] | | |
| SUDIP CHAUDHARY | C0922310 | [email protected] | | |
- **Course**: Software Tools and Emerging Technologies for AI and ML | |
- **Term**: 3rd | |
- **Instructor**: [Peter Sigurdson](https://www.linkedin.com/in/petersigurdson/) | |
""" | |
) | |
with gr.Tabs(): | |
# Chat tab | |
with gr.Tab("Chat"): | |
with gr.Row(): | |
with gr.Column(scale=3): | |
chatbot_interface = gr.Chatbot(label="Conversation", height=400) | |
with gr.Row(): | |
msg = gr.Textbox( | |
placeholder="Type your message here...", | |
label="Your message", | |
lines=2, | |
show_label=False, | |
) | |
send_btn = gr.Button("Send", variant="primary") | |
with gr.Accordion("Examples", open=False): | |
gr.Examples( | |
examples=[ | |
"Hello!", | |
"How are you?", | |
"What can you help me with?", | |
"Thank you", | |
"Goodbye", | |
], | |
inputs=msg, | |
) | |
with gr.Column(scale=1): | |
gr.Markdown("### Analysis") | |
intent_label = gr.Label(label="Predicted Intent") | |
confidence_score = gr.Number(label="Confidence Score") | |
gr.Markdown("### Settings") | |
confidence_slider = gr.Slider( | |
label="Confidence Threshold", | |
minimum=0.0, | |
maximum=1.0, | |
step=0.05, | |
value=chatbot.confidence_threshold, | |
) | |
default_resp = gr.Textbox( | |
label="Default Response", | |
value=chatbot.default_response, | |
lines=2, | |
) | |
update_settings_btn = gr.Button("Update Settings") | |
# Event handlers for chat | |
def user_message(user_message, history): | |
return "", history + [[user_message, None]] | |
def bot_message(history): | |
if history: | |
user_message = history[-1][0] | |
intent, response, confidence = chatbot.get_response(user_message) | |
history[-1][1] = response | |
return history, intent, confidence | |
return history, "N/A", 0.0 | |
msg.submit( | |
user_message, | |
[msg, chatbot_interface], | |
[msg, chatbot_interface], | |
queue=False, | |
).then( | |
bot_message, | |
chatbot_interface, | |
[chatbot_interface, intent_label, confidence_score], | |
) | |
send_btn.click( | |
user_message, | |
[msg, chatbot_interface], | |
[msg, chatbot_interface], | |
queue=False, | |
).then( | |
bot_message, | |
chatbot_interface, | |
[chatbot_interface, intent_label, confidence_score], | |
) | |
update_settings_btn.click( | |
update_settings, | |
[confidence_slider, default_resp], | |
gr.Textbox(label="Status"), | |
) | |
# Intents Management tab | |
with gr.Tab("Intents Management"): | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Add New Intent") | |
new_intent_name = gr.Textbox(label="Intent Name") | |
new_patterns = gr.Textbox(label="Patterns (one per line)", lines=5) | |
new_responses = gr.Textbox( | |
label="Responses (one per line)", lines=5 | |
) | |
add_intent_btn = gr.Button("Add Intent", variant="primary") | |
add_intent_status = gr.Textbox(label="Status") | |
with gr.Column(): | |
gr.Markdown("### Edit Intent") | |
edit_intent_dropdown = gr.Dropdown( | |
label="Select Intent to Edit", | |
choices=get_intent_list(), | |
interactive=True, | |
) | |
edit_patterns = gr.Textbox(label="Patterns (one per line)", lines=5) | |
edit_responses = gr.Textbox( | |
label="Responses (one per line)", lines=5 | |
) | |
with gr.Row(): | |
update_intent_btn = gr.Button("Update Intent") | |
delete_intent_btn = gr.Button("Delete Intent", variant="stop") | |
edit_intent_status = gr.Textbox(label="Status") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Import/Export Intents") | |
with gr.Row(): | |
export_intents_btn = gr.Button("Export Intents") | |
import_intents_file = gr.File( | |
label="Import Intents (JSON file)" | |
) | |
import_export_status = gr.Textbox(label="Status") | |
with gr.Column(): | |
gr.Markdown("### Current Intents") | |
refresh_intents_btn = gr.Button("Refresh Intents List") | |
intents_list = gr.Markdown() | |
# Event handlers for intents management | |
add_intent_btn.click( | |
add_intent, | |
[new_intent_name, new_patterns, new_responses], | |
add_intent_status, | |
) | |
# Update dropdown when adding/deleting intents | |
add_intent_btn.click(get_intent_list, [], edit_intent_dropdown) | |
edit_intent_dropdown.change( | |
get_intent_details, | |
edit_intent_dropdown, | |
[edit_patterns, edit_responses], | |
) | |
update_intent_btn.click( | |
edit_intent, | |
[edit_intent_dropdown, edit_patterns, edit_responses], | |
edit_intent_status, | |
) | |
delete_intent_btn.click( | |
delete_intent, edit_intent_dropdown, edit_intent_status | |
).then(get_intent_list, [], edit_intent_dropdown) | |
export_intents_btn.click(export_intents, [], import_export_status) | |
import_intents_file.change( | |
import_intents_from_file, import_intents_file, import_export_status | |
).then(get_intent_list, [], edit_intent_dropdown) | |
refresh_intents_btn.click(list_intents, [], intents_list) | |
# Training tab | |
with gr.Tab("Training"): | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Train Model") | |
epochs_input = gr.Number( | |
label="Epochs", value=500, minimum=100, maximum=5000, step=100 | |
) | |
learning_rate_input = gr.Number( | |
label="Learning Rate", | |
value=0.01, | |
minimum=0.0001, | |
maximum=0.1, | |
step=0.001, | |
) | |
hidden_layers_input = gr.Textbox( | |
label="Hidden Layers (comma-separated)", value="32, 16" | |
) | |
dropout_rate_input = gr.Number( | |
label="Dropout Rate", | |
value=0.2, | |
minimum=0.0, | |
maximum=0.5, | |
step=0.05, | |
) | |
train_btn = gr.Button("Train Model", variant="primary") | |
with gr.Column(): | |
training_status = gr.Textbox(label="Training Status", lines=6) | |
training_plot = gr.Image(label="Training History") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Model Management") | |
save_model_btn = gr.Button("Save Current Model") | |
load_model_file = gr.File(label="Load Model (JSON file)") | |
model_status = gr.Textbox(label="Status") | |
# Event handlers for training | |
train_btn.click( | |
train_model, | |
[ | |
epochs_input, | |
learning_rate_input, | |
hidden_layers_input, | |
dropout_rate_input, | |
], | |
[training_status, training_plot], | |
) | |
save_model_btn.click(save_model, [], model_status) | |
load_model_file.change(load_model_from_file, load_model_file, model_status) | |
# About tab | |
with gr.Tab("About"): | |
gr.Markdown( | |
""" | |
## Neural Network Chatbot | |
This chatbot uses a neural network to understand and respond to user messages. | |
The model is trained on a set of intents, each with patterns and responses. | |
### Features: | |
- **Neural Network Backend**: The chatbot uses a fully-connected neural network with configurable layers. | |
- **Intent Recognition**: Recognizes user intents based on trained patterns. | |
- **Customizable Responses**: Each intent has multiple possible responses for variety. | |
- **Training Interface**: Train the model directly from the web interface. | |
- **Intent Management**: Add, edit, delete, import, and export intents. | |
- **Model Management**: Save and load models for future use. | |
### How to Use: | |
1. **Chat Tab**: Interact with the chatbot. | |
2. **Intents Management Tab**: Manage the chatbot's knowledge. | |
3. **Training Tab**: Train the neural network model. | |
4. **About Tab**: Learn about the chatbot and its features. | |
### Technical Details: | |
- Built with Python, NumPy, and Gradio. | |
- Uses a bag-of-words approach for text representation. | |
- Neural network with configurable hidden layers and activation functions. | |
- Cross-entropy loss for multi-class classification. | |
Created for deployment on Hugging Face Spaces. | |
""" | |
) | |
# Call initialize again after defining the UI | |
# to make sure dropdown is populated | |
chat_intents = get_intent_list() | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch() | |