ChatbotUNAJ / app.py
mauroleguiok's picture
Update app.py
40b078b verified
# -*- coding: utf-8 -*-
"""Copia de Modelo_Chatbot_Final_3_con_Gradio.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1sFAltehLtdNpHoQVsiDeikgSJkIDTWrD
"""
import warnings
warnings.filterwarnings('ignore')
import json
import numpy as np
import pandas as pd
import random
from matplotlib import pyplot as plt
import seaborn as sns
from wordcloud import WordCloud,STOPWORDS
import missingno as msno
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
#from keras.preprocessing import text
import keras
from keras.models import Sequential
from keras.layers import Dense,Embedding,LSTM,Dropout
from keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.preprocessing.sequence import pad_sequences
import nltk
from nltk import word_tokenize
from nltk.stem import PorterStemmer
import torch
from torch.utils.data import Dataset
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification
from transformers import pipeline
from transformers import DistilBertTokenizerFast
from transformers import BertForSequenceClassification, BertTokenizerFast
from transformers import TFDistilBertForSequenceClassification, Trainer, TFTrainingArguments
from transformers import BertTokenizer, TFBertForSequenceClassification, BertConfig
from transformers import TrainingArguments, Trainer, EarlyStoppingCallback
import re
import language_tool_python
import logging
import spacy
def load_json_file(filename):
with open(filename) as f:
file = json.load(f)
return file
filename = 'intents_aumentado.json'
intents = load_json_file(filename)
def create_df():
df = pd.DataFrame({
'Pattern' : [],
'Tag' : []
})
return df
df = create_df()
df
def extract_json_info(json_file, df):
for intent in json_file['intents']:
for pattern in intent['patterns']:
sentence_tag = [pattern, intent['tag']]
df.loc[len(df.index)] = sentence_tag
return df
df = extract_json_info(intents, df)
df.head()
df2 = df.copy()
df2.head()
import nltk
nltk.download('punkt_tab')
stemmer = PorterStemmer()
ignore_words=['?', '!', ',', '.']
def preprocess_pattern(pattern):
words = word_tokenize(pattern.lower())
stemmed_words = [stemmer.stem(word) for word in words if word not in ignore_words]
return " ".join(stemmed_words)
df['Pattern'] = df['Pattern'].apply(preprocess_pattern)
df2.head()
labels = df2['Tag'].unique().tolist()
labels = [s.strip() for s in labels]
labels
num_labels = len(labels)
id2label = {id:label for id, label in enumerate(labels)}
label2id = {label:id for id, label in enumerate(labels)}
id2label
label2id
df2['labels'] = df2['Tag'].map(lambda x: label2id[x.strip()])
df2.head()
X = list(df2['Pattern'])
X[:5]
y = list(df2['labels'])
y[:5]
X_train,X_test,y_train,y_test = train_test_split(X,y,random_state = 123)
model_name = "dccuchile/bert-base-spanish-wwm-cased"
max_len = 256
tokenizer = BertTokenizer.from_pretrained(model_name,
max_length=max_len)
model = BertForSequenceClassification.from_pretrained(model_name,
num_labels=num_labels,
id2label=id2label,
label2id = label2id)
train_encoding = tokenizer(X_train, truncation=True, padding=True)
test_encoding = tokenizer(X_test, truncation=True, padding=True)
full_data = tokenizer(X, truncation=True, padding=True)
class DataLoader(Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels
def __getitem__(self, idx):
item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
item['labels'] = torch.tensor(self.labels[idx])
return item
def __len__(self):
return len(self.labels)
train_dataloader = DataLoader(train_encoding, y_train)
test_dataloader = DataLoader(test_encoding, y_test)
fullDataLoader = DataLoader(full_data, y_test)
def compute_metrics(pred):
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro')
acc = accuracy_score(labels, preds)
return {
'accuracy': acc,
'f1': f1,
'precision': precision,
'recall': recall
}
# Parametros finales del modelo
training_args = TrainingArguments(
output_dir='./output', # Carpeta donde se guardarán los modelos entrenados y checkpoints
do_train=True,
do_eval=True,
num_train_epochs=30, # Número total de épocas (pasadas completas por el dataset de entrenamiento)
per_device_train_batch_size=8, # Tamaño del batch de entrenamiento por dispositivo (GPU o CPU)
per_device_eval_batch_size=32, # Tamaño del batch de evaluación por dispositivo (mayor para evaluar más rápido)
gradient_accumulation_steps=4, # Acumula gradientes por 4 pasos antes de hacer una actualización (simula batch más grande)
learning_rate=2e-5, # Tasa de aprendizaje inicial
warmup_ratio=0.1, # Porcentaje de pasos de calentamiento (warmup) sobre el total de pasos de entrenamiento
weight_decay=0.1, # Regularización para evitar overfitting penalizando grandes pesos
lr_scheduler_type="cosine", # Tipo de scheduler para modificar la tasa de aprendizaje (coseno en este caso)
#fp16=True, # Usa precisión mixta (float16) para acelerar entrenamiento si hay soporte (ej. en GPUs)
evaluation_strategy="steps",
eval_steps=50, # Evalúa el modelo cada 50 pasos de entrenamiento
save_strategy="steps",
save_steps=50, # Guarda el modelo cada 50 pasos
save_total_limit=3, # Mantiene solo los últimos 3 checkpoints, borra los anteriores
logging_strategy="steps",
logging_dir='./multi-class-logs', # Carpeta donde se guardarán los logs de entrenamiento cada 50 pasos
logging_steps=50, #
load_best_model_at_end=True, # Carga automáticamente el mejor modelo evaluado al finalizar el entrenamiento
metric_for_best_model="f1", # Métrica que se usa para definir cuál fue el "mejor" modelo
greater_is_better=True
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataloader,
eval_dataset=test_dataloader,
compute_metrics=compute_metrics,
callbacks=[EarlyStoppingCallback(early_stopping_patience=3)] # Frena el entrenamiento si no mejora la métrica de evaluación después de 3 evaluaciones consecutiva
)
import os
os.environ["WANDB_API_KEY"] = "4fb9dff0336a34ab812e86f91b6c3a877cb25b36"
trainer.train()
q=[trainer.evaluate(eval_dataset=df2) for df2 in [train_dataloader, test_dataloader]]
pd.DataFrame(q, index=["train","test"]).iloc[:,:5]
def predict(text):
inputs = tokenizer(text, padding=True, truncation=True, max_length=512, return_tensors="pt").to("cuda")
outputs = model(**inputs)
probs = outputs[0].softmax(1)
pred_label_idx = probs.argmax()
pred_label = model.config.id2label[pred_label_idx.item()]
return probs, pred_label_idx, pred_label
text = "Hola"
predict(text)
model_path = "chatbot"
trainer.save_model(model_path)
tokenizer.save_pretrained(model_path)
model_path = "/content/chatbot"
model = BertForSequenceClassification.from_pretrained(model_path)
tokenizer= BertTokenizerFast.from_pretrained(model_path)
chatbot= pipeline("text-classification", model=model, tokenizer=tokenizer)
chatbot("Hola")
negaciones = [
"no", "nunca", "nadie", "ningún", "ninguna", "nada",
"jamás", "jamas", "ni", "tampoco", "de ninguna manera",
"en absoluto", "en ningún caso", "no es cierto",
"no estoy de acuerdo", "no me parece", "no creo",
"no quiero", "no puedo", "no quiero hacerlo", "no acepto", "no gracias"
]
afirmaciones = [
"sí", "si", "claro", "por supuesto", "entendido", "estoy de acuerdo",
"acepto", "exacto", "correcto", "eso es", "está bien", "esta bien",
"claro que sí", "lo creo", "es cierto", "sin duda", "así es", "claro que si"
"perfecto", "me parece bien", "seguro", "definitivamente", "por supuesto"
]
# Asumiendo que intents ya está definido
def obtener_respuesta_aleatoria(tag):
"""Busca el intent correspondiente al tag y devuelve una respuesta aleatoria."""
for intent in intents['intents']:
if intent['tag'] == tag:
return random.choice(intent['responses'])
return "No tengo respuesta para eso."
# Asumiendo que intents ya está definido
def obtener_lista_de_respuesta(tag):
"""Busca el intent correspondiente al tag y devuelve una respuesta aleatoria."""
for intent in intents['intents']:
if intent['tag'] == tag:
return intent['responses']
return "No tengo respuesta para eso."
historial_respuestas = {}
def obtener_respuesta_sin_repetir(label):
global historial_respuestas
# Si es la primera vez que se usa el label, inicializar historial
if label not in historial_respuestas:
historial_respuestas[label] = []
respuestas_posibles = obtener_lista_de_respuesta(label) # Lista de respuestas disponibles para el tag
# Filtrar respuestas que aún no se usaron
respuestas_disponibles = [r for r in respuestas_posibles if r not in historial_respuestas[label]]
if not respuestas_disponibles:
historial_respuestas[label] = [] # Resetear historial cuando se agoten todas
# Elegir una nueva respuesta sin repetir
nueva_respuesta = random.choice([r for r in respuestas_posibles if r not in historial_respuestas[label]])
# Agregarla al historial
historial_respuestas[label].append(nueva_respuesta)
return nueva_respuesta
def corregir_preguntas(texto, idioma='es'):
"""
Corrige la ortografía y gramática del texto usando LanguageTool.
También ajusta signos de interrogación y acentos en palabras interrogativas.
Ignora correcciones sobre las palabras: 'unaj', 'arturo', 'jauretche'.
"""
try:
# Agregar "?" si el texto tiene 3 o más palabras y no termina con "?"
if len(texto.split()) >= 3 and not texto.endswith("?"):
texto += "?"
# Palabras a ignorar (en minúsculas)
palabras_ignorar = ["unaj", "arturo", "jauretche", "profode"]
reemplazos = {}
# Reemplazar palabras ignoradas por marcadores temporales
def reemplazar_ignoradas(match):
palabra = match.group(0)
marcador = f"__IGNORAR_{len(reemplazos)}__"
reemplazos[marcador] = palabra
return marcador
patron_ignorar = re.compile(r'\b(' + '|'.join(palabras_ignorar) + r')\b', re.IGNORECASE)
texto_temporal = patron_ignorar.sub(reemplazar_ignoradas, texto)
# Inicializar el corrector de LanguageTool
tool = language_tool_python.LanguageToolPublicAPI(idioma)
# Obtener las correcciones sugeridas
texto_corregido = tool.correct(texto_temporal)
# Restaurar las palabras ignoradas
for marcador, palabra_original in reemplazos.items():
texto_corregido = texto_corregido.replace(marcador, palabra_original)
# Diccionario con palabras interrogativas y sus versiones acentuadas
palabras_interrogativas = {
"como": "cómo", "cuando": "cuándo", "donde": "dónde", "que": "qué",
"quien": "quién", "cual": "cuál", "cuanto": "cuánto"
}
# Si la oración es interrogativa, corregir solo la primera palabra interrogativa
if texto_corregido.endswith("?"):
palabras = texto_corregido[:-1].split() # Remover "?" y dividir en palabras
primera_encontrada = False
for i, palabra in enumerate(palabras):
palabra_limpia = palabra.lower().strip("¿") # Remover el signo "¿" si existe
# Si es una palabra interrogativa y es la primera encontrada, corregir
if palabra_limpia in palabras_interrogativas:
if not primera_encontrada:
palabras[i] = palabras_interrogativas[palabra_limpia]
primera_encontrada = True
texto_corregido = " ".join(palabras) + "?"
# Asegurar que la oración comienza con "¿"
if not texto_corregido.startswith("¿"):
texto_corregido = "¿" + texto_corregido
# Mantener solo el último signo "¿" y eliminar los anteriores
if texto_corregido.count("¿") > 1:
ultima_pos = texto_corregido.rfind("¿")
texto_corregido = texto_corregido[:ultima_pos].replace("¿", "") + texto_corregido[ultima_pos:]
return texto_corregido
except Exception:
return texto
def normalizar_clave(texto):
reemplazos = {
"á": "a", "é": "e", "í": "i", "ó": "o", "ú": "u",
"ä": "a", "ë": "e", "ï": "i", "ö": "o", "ü": "u"
}
for acentuada, normal in reemplazos.items():
texto = texto.replace(acentuada, normal)
return texto.strip().replace(" ", "+")
estado_chatbot = {
"esperando_confirmacion": False,
"opciones": [],
"texto_original": ""
}
def obtener_respuesta_chatbot(text):
global estado_chatbot
# Si no está esperando confirmación, resetea su estado antes de procesar la nueva consulta
if not estado_chatbot["esperando_confirmacion"]:
estado_chatbot["opciones"] = [] # Limpia opciones anteriores
estado_chatbot["texto_original"] = "" # Resetea el estado previo
if estado_chatbot["esperando_confirmacion"]:
if text.lower() in afirmaciones:
estado_chatbot["esperando_confirmacion"] = False # Se resetea el estado
respuesta = obtener_respuesta_sin_repetir(estado_chatbot["opciones"][0]["label"])
estado_chatbot["opciones"] = [] # Limpia opciones anteriores
estado_chatbot["texto_original"] = "" # Resetea el estado previo
return respuesta
elif text.lower() in negaciones:
if len(estado_chatbot["opciones"]) > 1:
estado_chatbot["esperando_confirmacion"] = False # Se resetea el estado
return obtener_respuesta_sin_repetir(estado_chatbot["opciones"][1]["label"])
else:
estado_chatbot["esperando_confirmacion"] = False # Se resetea el estado
return "No tengo más opciones, ¿podés reformular la pregunta?"
else:
return "Por favor, respondé 'sí' o 'no'."
prediction = chatbot(text)
prediction = sorted(prediction, key=lambda x: x["score"], reverse=True) # Ordenar por score
label_principal = prediction[0]["label"]
score_principal = prediction[0]["score"]
if score_principal >= 0.1:
print("(Grado de seguridad en la respuesta: ", score_principal, ")")
return obtener_respuesta_sin_repetir(label_principal)
elif 0.20 <= score_principal < 0.4:
estado_chatbot["esperando_confirmacion"] = True
estado_chatbot["opciones"] = prediction[:2]
estado_chatbot["texto_original"] = text
opciones_texto = ", ".join(
[f"{alt['label']} ({alt['score']:.2f})" for alt in estado_chatbot["opciones"]]
)
return f"No estoy seguro de la respuesta correscta. ¿Te referís a alguna de estas opciones? Opción 1: {opciones_texto} (Si/No)"
else:
print(f"No tengo una respuesta precisa. ¿Puedes decirme una palabra clave de tu pregunta para que pueda ayudarte? Ingresa una o dos palabras:")
clave = "info"
clave = normalizar_clave(clave)
# Verificar si la palabra clave no es una negación
if clave not in negaciones:
# Si no es una negación, proporcionar el enlace
return f"Prueba consultando el siguiente enlace: https://www.unaj.edu.ar/?s={clave} o reformula tu pregunta. Escribela a continuación:"
else:
# Si la clave es una negación, podrías manejarlo aquí
return "Comprendo, intenta reformular tu pregunta por favor para que pueda entenderla. Prueba usando frases cortas y claras."
faq = ['¿Cuándo puedo inscribirme a carreras?', '¿Qué carreras tiene la UNAJ?', '¿Qué posgrados tiene la UNAJ?', '¿Qué cursos de oficios tiene la UNAJ y cómo puedo inscribirme?', '¿Qué otras propuestas de formación ofrece la UNAJ?', '¿Puedo estudiar idiomas en la UNAJ?', '¿Qué hago si no puedo ingresar al SIU GUARANÍ?', '¿Cuándo comienza y termina el cuatrimestre?', '¿Dónde encuentro el calendario académico?', '¿Cuándo puedo reincorporarme?', '¿Cuándo puedo cambiar de carrera?', '¿Cómo pido equivalencias?', '¿Cómo pido una licencia estudiantil?']
for f in faq:
print(f)
print(obtener_respuesta_chatbot(corregir_preguntas(f.lower())))
print("-------------------------------------------------------------------------------------------------------------------------------------")
# Instalar librerías necesarias
import gradio as gr
from pyngrok import ngrok
import pandas as pd
from datetime import datetime
from bs4 import BeautifulSoup
# Nombre del archivo CSV
feedback_file = "feedback.csv"
# Inicializar CSV si no existe
def init_csv():
try:
pd.read_csv(feedback_file)
except FileNotFoundError:
df = pd.DataFrame(columns=["timestamp", "question", "response", "feedback"])
df.to_csv(feedback_file, index=False)
# Extrae el texto de la respuesta HTML
def limpiar_html(texto_html):
return BeautifulSoup(texto_html, "html.parser").get_text()
# Guardar el feedback cuando se hace clic en "Nuevo Mensaje"
def guardar_feedback(question, response, feedback):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
response_limpia = limpiar_html(response)
new_data = pd.DataFrame([[timestamp, question, response_limpia, feedback]],
columns=["timestamp", "question", "response", "feedback"])
new_data.to_csv(feedback_file, mode='a', header=False, index=False)
return "", "", 10, gr.update(interactive=False) # limpia todo y desactiva "Nuevo Mensaje"
# Convierte las URL en hipervinculos clickeables
def convertir_urls_a_links(texto):
# Expresión regular para encontrar URLs
url_pattern = r"(https?://[^\s]+)"
# Reemplaza cada URL por una etiqueta <a>
return re.sub(url_pattern, r'<a href="\1" target="_blank">\1</a>', texto)
# Simulación de respuesta del chatbot (reemplazar por tu modelo real)
def chatbot_response(question):
mensaje_corregido = corregir_preguntas(question)
respuesta_raw = obtener_respuesta_chatbot(mensaje_corregido)
respuesta = convertir_urls_a_links(respuesta_raw)
# Agrega un contenedor con estilo para simular una caja
respuesta_contenedor = f"""
<div style='background-color:#2b2b2b; color:#f1f1f1; border:0px solid #515057;
padding:10px; border-radius:5px; white-space:pre-wrap'>
{respuesta}
</div>
"""
return respuesta_contenedor, gr.update(visible=True, interactive=True)
# Inicializamos el CSV
init_csv()
# Interfaz Gradio
with gr.Blocks(css="""
body {
background-color: black;
color: #00ffff;
}
.gr-button {
background-color: #00ffff !important;
color: black !important;
}
.gr-textbox textarea, .gr-number input {
background-color: #111;
color: #00ffff;
border: 1px solid #00ffff;
}
""") as demo:
with gr.Row():
gr.HTML("<div style='flex:1'></div><img src='https://guarani.unaj.edu.ar/_comp/unaj/img/logo-transparente.png' height='60px' style='margin:10px'/>")
gr.Markdown("# Chatbot UNAJ\n## Hola! Soy Arturito, el bot de la UNAJ y estoy para responder tus preguntas sobre la Universidad Nacional Arturo Jauretche")
question_input = gr.Textbox(label="Mensaje", placeholder="Escribí tu consulta...")
submit_btn = gr.Button("Enviar Mensaje")
# Se usa un HTML para que los links de la respuesta sean clickeables
response_output = gr.HTML()
# Se coloca un slider que permite captar el feedback de la respuesta
feedback_slider = gr.Slider(minimum=1, maximum=10, value=10, step=1,
label="¿Qué tan útil fue la respuesta? (1 = Nada útil, 10 = Muy útil)", interactive=True)
# Aparece un boton para "Nuevo Mensaje" que limpia el cuadro de "Mensaje" y guarda la respuesta y puntuación.
new_message_btn = gr.Button("Nuevo Mensaje", visible=False)
# Evento al hacer clic en "Enviar Mensaje"
submit_btn.click(fn=chatbot_response,
inputs=question_input,
outputs=[response_output, new_message_btn])
# Evento al hacer clic en "Nuevo Mensaje"
new_message_btn.click(fn=guardar_feedback,
inputs=[question_input, response_output, feedback_slider],
outputs=[question_input, response_output, feedback_slider, new_message_btn])
demo.launch(share=True)