Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
"""Copia de Modelo_Chatbot_Final_3_con_Gradio.ipynb | |
Automatically generated by Colab. | |
Original file is located at | |
https://colab.research.google.com/drive/1sFAltehLtdNpHoQVsiDeikgSJkIDTWrD | |
""" | |
import warnings | |
warnings.filterwarnings('ignore') | |
import json | |
import numpy as np | |
import pandas as pd | |
import random | |
from matplotlib import pyplot as plt | |
import seaborn as sns | |
from wordcloud import WordCloud,STOPWORDS | |
import missingno as msno | |
from sklearn.feature_extraction.text import CountVectorizer | |
from sklearn.model_selection import train_test_split | |
from sklearn.metrics import accuracy_score, precision_recall_fscore_support | |
#from keras.preprocessing import text | |
import keras | |
from keras.models import Sequential | |
from keras.layers import Dense,Embedding,LSTM,Dropout | |
from keras.callbacks import ReduceLROnPlateau | |
from tensorflow.keras.preprocessing.sequence import pad_sequences | |
import nltk | |
from nltk import word_tokenize | |
from nltk.stem import PorterStemmer | |
import torch | |
from torch.utils.data import Dataset | |
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification | |
from transformers import pipeline | |
from transformers import DistilBertTokenizerFast | |
from transformers import BertForSequenceClassification, BertTokenizerFast | |
from transformers import TFDistilBertForSequenceClassification, Trainer, TFTrainingArguments | |
from transformers import BertTokenizer, TFBertForSequenceClassification, BertConfig | |
from transformers import TrainingArguments, Trainer, EarlyStoppingCallback | |
import re | |
import language_tool_python | |
import logging | |
import spacy | |
def load_json_file(filename): | |
with open(filename) as f: | |
file = json.load(f) | |
return file | |
filename = 'intents_aumentado.json' | |
intents = load_json_file(filename) | |
def create_df(): | |
df = pd.DataFrame({ | |
'Pattern' : [], | |
'Tag' : [] | |
}) | |
return df | |
df = create_df() | |
df | |
def extract_json_info(json_file, df): | |
for intent in json_file['intents']: | |
for pattern in intent['patterns']: | |
sentence_tag = [pattern, intent['tag']] | |
df.loc[len(df.index)] = sentence_tag | |
return df | |
df = extract_json_info(intents, df) | |
df.head() | |
df2 = df.copy() | |
df2.head() | |
import nltk | |
nltk.download('punkt_tab') | |
stemmer = PorterStemmer() | |
ignore_words=['?', '!', ',', '.'] | |
def preprocess_pattern(pattern): | |
words = word_tokenize(pattern.lower()) | |
stemmed_words = [stemmer.stem(word) for word in words if word not in ignore_words] | |
return " ".join(stemmed_words) | |
df['Pattern'] = df['Pattern'].apply(preprocess_pattern) | |
df2.head() | |
labels = df2['Tag'].unique().tolist() | |
labels = [s.strip() for s in labels] | |
labels | |
num_labels = len(labels) | |
id2label = {id:label for id, label in enumerate(labels)} | |
label2id = {label:id for id, label in enumerate(labels)} | |
id2label | |
label2id | |
df2['labels'] = df2['Tag'].map(lambda x: label2id[x.strip()]) | |
df2.head() | |
X = list(df2['Pattern']) | |
X[:5] | |
y = list(df2['labels']) | |
y[:5] | |
X_train,X_test,y_train,y_test = train_test_split(X,y,random_state = 123) | |
model_name = "dccuchile/bert-base-spanish-wwm-cased" | |
max_len = 256 | |
tokenizer = BertTokenizer.from_pretrained(model_name, | |
max_length=max_len) | |
model = BertForSequenceClassification.from_pretrained(model_name, | |
num_labels=num_labels, | |
id2label=id2label, | |
label2id = label2id) | |
train_encoding = tokenizer(X_train, truncation=True, padding=True) | |
test_encoding = tokenizer(X_test, truncation=True, padding=True) | |
full_data = tokenizer(X, truncation=True, padding=True) | |
class DataLoader(Dataset): | |
def __init__(self, encodings, labels): | |
self.encodings = encodings | |
self.labels = labels | |
def __getitem__(self, idx): | |
item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} | |
item['labels'] = torch.tensor(self.labels[idx]) | |
return item | |
def __len__(self): | |
return len(self.labels) | |
train_dataloader = DataLoader(train_encoding, y_train) | |
test_dataloader = DataLoader(test_encoding, y_test) | |
fullDataLoader = DataLoader(full_data, y_test) | |
def compute_metrics(pred): | |
labels = pred.label_ids | |
preds = pred.predictions.argmax(-1) | |
precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro') | |
acc = accuracy_score(labels, preds) | |
return { | |
'accuracy': acc, | |
'f1': f1, | |
'precision': precision, | |
'recall': recall | |
} | |
# Parametros finales del modelo | |
training_args = TrainingArguments( | |
output_dir='./output', # Carpeta donde se guardarán los modelos entrenados y checkpoints | |
do_train=True, | |
do_eval=True, | |
num_train_epochs=30, # Número total de épocas (pasadas completas por el dataset de entrenamiento) | |
per_device_train_batch_size=8, # Tamaño del batch de entrenamiento por dispositivo (GPU o CPU) | |
per_device_eval_batch_size=32, # Tamaño del batch de evaluación por dispositivo (mayor para evaluar más rápido) | |
gradient_accumulation_steps=4, # Acumula gradientes por 4 pasos antes de hacer una actualización (simula batch más grande) | |
learning_rate=2e-5, # Tasa de aprendizaje inicial | |
warmup_ratio=0.1, # Porcentaje de pasos de calentamiento (warmup) sobre el total de pasos de entrenamiento | |
weight_decay=0.1, # Regularización para evitar overfitting penalizando grandes pesos | |
lr_scheduler_type="cosine", # Tipo de scheduler para modificar la tasa de aprendizaje (coseno en este caso) | |
#fp16=True, # Usa precisión mixta (float16) para acelerar entrenamiento si hay soporte (ej. en GPUs) | |
evaluation_strategy="steps", | |
eval_steps=50, # Evalúa el modelo cada 50 pasos de entrenamiento | |
save_strategy="steps", | |
save_steps=50, # Guarda el modelo cada 50 pasos | |
save_total_limit=3, # Mantiene solo los últimos 3 checkpoints, borra los anteriores | |
logging_strategy="steps", | |
logging_dir='./multi-class-logs', # Carpeta donde se guardarán los logs de entrenamiento cada 50 pasos | |
logging_steps=50, # | |
load_best_model_at_end=True, # Carga automáticamente el mejor modelo evaluado al finalizar el entrenamiento | |
metric_for_best_model="f1", # Métrica que se usa para definir cuál fue el "mejor" modelo | |
greater_is_better=True | |
) | |
trainer = Trainer( | |
model=model, | |
args=training_args, | |
train_dataset=train_dataloader, | |
eval_dataset=test_dataloader, | |
compute_metrics=compute_metrics, | |
callbacks=[EarlyStoppingCallback(early_stopping_patience=3)] # Frena el entrenamiento si no mejora la métrica de evaluación después de 3 evaluaciones consecutiva | |
) | |
import os | |
os.environ["WANDB_API_KEY"] = "4fb9dff0336a34ab812e86f91b6c3a877cb25b36" | |
trainer.train() | |
q=[trainer.evaluate(eval_dataset=df2) for df2 in [train_dataloader, test_dataloader]] | |
pd.DataFrame(q, index=["train","test"]).iloc[:,:5] | |
def predict(text): | |
inputs = tokenizer(text, padding=True, truncation=True, max_length=512, return_tensors="pt").to("cuda") | |
outputs = model(**inputs) | |
probs = outputs[0].softmax(1) | |
pred_label_idx = probs.argmax() | |
pred_label = model.config.id2label[pred_label_idx.item()] | |
return probs, pred_label_idx, pred_label | |
text = "Hola" | |
predict(text) | |
model_path = "chatbot" | |
trainer.save_model(model_path) | |
tokenizer.save_pretrained(model_path) | |
model_path = "/content/chatbot" | |
model = BertForSequenceClassification.from_pretrained(model_path) | |
tokenizer= BertTokenizerFast.from_pretrained(model_path) | |
chatbot= pipeline("text-classification", model=model, tokenizer=tokenizer) | |
chatbot("Hola") | |
negaciones = [ | |
"no", "nunca", "nadie", "ningún", "ninguna", "nada", | |
"jamás", "jamas", "ni", "tampoco", "de ninguna manera", | |
"en absoluto", "en ningún caso", "no es cierto", | |
"no estoy de acuerdo", "no me parece", "no creo", | |
"no quiero", "no puedo", "no quiero hacerlo", "no acepto", "no gracias" | |
] | |
afirmaciones = [ | |
"sí", "si", "claro", "por supuesto", "entendido", "estoy de acuerdo", | |
"acepto", "exacto", "correcto", "eso es", "está bien", "esta bien", | |
"claro que sí", "lo creo", "es cierto", "sin duda", "así es", "claro que si" | |
"perfecto", "me parece bien", "seguro", "definitivamente", "por supuesto" | |
] | |
# Asumiendo que intents ya está definido | |
def obtener_respuesta_aleatoria(tag): | |
"""Busca el intent correspondiente al tag y devuelve una respuesta aleatoria.""" | |
for intent in intents['intents']: | |
if intent['tag'] == tag: | |
return random.choice(intent['responses']) | |
return "No tengo respuesta para eso." | |
# Asumiendo que intents ya está definido | |
def obtener_lista_de_respuesta(tag): | |
"""Busca el intent correspondiente al tag y devuelve una respuesta aleatoria.""" | |
for intent in intents['intents']: | |
if intent['tag'] == tag: | |
return intent['responses'] | |
return "No tengo respuesta para eso." | |
historial_respuestas = {} | |
def obtener_respuesta_sin_repetir(label): | |
global historial_respuestas | |
# Si es la primera vez que se usa el label, inicializar historial | |
if label not in historial_respuestas: | |
historial_respuestas[label] = [] | |
respuestas_posibles = obtener_lista_de_respuesta(label) # Lista de respuestas disponibles para el tag | |
# Filtrar respuestas que aún no se usaron | |
respuestas_disponibles = [r for r in respuestas_posibles if r not in historial_respuestas[label]] | |
if not respuestas_disponibles: | |
historial_respuestas[label] = [] # Resetear historial cuando se agoten todas | |
# Elegir una nueva respuesta sin repetir | |
nueva_respuesta = random.choice([r for r in respuestas_posibles if r not in historial_respuestas[label]]) | |
# Agregarla al historial | |
historial_respuestas[label].append(nueva_respuesta) | |
return nueva_respuesta | |
def corregir_preguntas(texto, idioma='es'): | |
""" | |
Corrige la ortografía y gramática del texto usando LanguageTool. | |
También ajusta signos de interrogación y acentos en palabras interrogativas. | |
Ignora correcciones sobre las palabras: 'unaj', 'arturo', 'jauretche'. | |
""" | |
try: | |
# Agregar "?" si el texto tiene 3 o más palabras y no termina con "?" | |
if len(texto.split()) >= 3 and not texto.endswith("?"): | |
texto += "?" | |
# Palabras a ignorar (en minúsculas) | |
palabras_ignorar = ["unaj", "arturo", "jauretche", "profode"] | |
reemplazos = {} | |
# Reemplazar palabras ignoradas por marcadores temporales | |
def reemplazar_ignoradas(match): | |
palabra = match.group(0) | |
marcador = f"__IGNORAR_{len(reemplazos)}__" | |
reemplazos[marcador] = palabra | |
return marcador | |
patron_ignorar = re.compile(r'\b(' + '|'.join(palabras_ignorar) + r')\b', re.IGNORECASE) | |
texto_temporal = patron_ignorar.sub(reemplazar_ignoradas, texto) | |
# Inicializar el corrector de LanguageTool | |
tool = language_tool_python.LanguageToolPublicAPI(idioma) | |
# Obtener las correcciones sugeridas | |
texto_corregido = tool.correct(texto_temporal) | |
# Restaurar las palabras ignoradas | |
for marcador, palabra_original in reemplazos.items(): | |
texto_corregido = texto_corregido.replace(marcador, palabra_original) | |
# Diccionario con palabras interrogativas y sus versiones acentuadas | |
palabras_interrogativas = { | |
"como": "cómo", "cuando": "cuándo", "donde": "dónde", "que": "qué", | |
"quien": "quién", "cual": "cuál", "cuanto": "cuánto" | |
} | |
# Si la oración es interrogativa, corregir solo la primera palabra interrogativa | |
if texto_corregido.endswith("?"): | |
palabras = texto_corregido[:-1].split() # Remover "?" y dividir en palabras | |
primera_encontrada = False | |
for i, palabra in enumerate(palabras): | |
palabra_limpia = palabra.lower().strip("¿") # Remover el signo "¿" si existe | |
# Si es una palabra interrogativa y es la primera encontrada, corregir | |
if palabra_limpia in palabras_interrogativas: | |
if not primera_encontrada: | |
palabras[i] = palabras_interrogativas[palabra_limpia] | |
primera_encontrada = True | |
texto_corregido = " ".join(palabras) + "?" | |
# Asegurar que la oración comienza con "¿" | |
if not texto_corregido.startswith("¿"): | |
texto_corregido = "¿" + texto_corregido | |
# Mantener solo el último signo "¿" y eliminar los anteriores | |
if texto_corregido.count("¿") > 1: | |
ultima_pos = texto_corregido.rfind("¿") | |
texto_corregido = texto_corregido[:ultima_pos].replace("¿", "") + texto_corregido[ultima_pos:] | |
return texto_corregido | |
except Exception: | |
return texto | |
def normalizar_clave(texto): | |
reemplazos = { | |
"á": "a", "é": "e", "í": "i", "ó": "o", "ú": "u", | |
"ä": "a", "ë": "e", "ï": "i", "ö": "o", "ü": "u" | |
} | |
for acentuada, normal in reemplazos.items(): | |
texto = texto.replace(acentuada, normal) | |
return texto.strip().replace(" ", "+") | |
estado_chatbot = { | |
"esperando_confirmacion": False, | |
"opciones": [], | |
"texto_original": "" | |
} | |
def obtener_respuesta_chatbot(text): | |
global estado_chatbot | |
# Si no está esperando confirmación, resetea su estado antes de procesar la nueva consulta | |
if not estado_chatbot["esperando_confirmacion"]: | |
estado_chatbot["opciones"] = [] # Limpia opciones anteriores | |
estado_chatbot["texto_original"] = "" # Resetea el estado previo | |
if estado_chatbot["esperando_confirmacion"]: | |
if text.lower() in afirmaciones: | |
estado_chatbot["esperando_confirmacion"] = False # Se resetea el estado | |
respuesta = obtener_respuesta_sin_repetir(estado_chatbot["opciones"][0]["label"]) | |
estado_chatbot["opciones"] = [] # Limpia opciones anteriores | |
estado_chatbot["texto_original"] = "" # Resetea el estado previo | |
return respuesta | |
elif text.lower() in negaciones: | |
if len(estado_chatbot["opciones"]) > 1: | |
estado_chatbot["esperando_confirmacion"] = False # Se resetea el estado | |
return obtener_respuesta_sin_repetir(estado_chatbot["opciones"][1]["label"]) | |
else: | |
estado_chatbot["esperando_confirmacion"] = False # Se resetea el estado | |
return "No tengo más opciones, ¿podés reformular la pregunta?" | |
else: | |
return "Por favor, respondé 'sí' o 'no'." | |
prediction = chatbot(text) | |
prediction = sorted(prediction, key=lambda x: x["score"], reverse=True) # Ordenar por score | |
label_principal = prediction[0]["label"] | |
score_principal = prediction[0]["score"] | |
if score_principal >= 0.1: | |
print("(Grado de seguridad en la respuesta: ", score_principal, ")") | |
return obtener_respuesta_sin_repetir(label_principal) | |
elif 0.20 <= score_principal < 0.4: | |
estado_chatbot["esperando_confirmacion"] = True | |
estado_chatbot["opciones"] = prediction[:2] | |
estado_chatbot["texto_original"] = text | |
opciones_texto = ", ".join( | |
[f"{alt['label']} ({alt['score']:.2f})" for alt in estado_chatbot["opciones"]] | |
) | |
return f"No estoy seguro de la respuesta correscta. ¿Te referís a alguna de estas opciones? Opción 1: {opciones_texto} (Si/No)" | |
else: | |
print(f"No tengo una respuesta precisa. ¿Puedes decirme una palabra clave de tu pregunta para que pueda ayudarte? Ingresa una o dos palabras:") | |
clave = "info" | |
clave = normalizar_clave(clave) | |
# Verificar si la palabra clave no es una negación | |
if clave not in negaciones: | |
# Si no es una negación, proporcionar el enlace | |
return f"Prueba consultando el siguiente enlace: https://www.unaj.edu.ar/?s={clave} o reformula tu pregunta. Escribela a continuación:" | |
else: | |
# Si la clave es una negación, podrías manejarlo aquí | |
return "Comprendo, intenta reformular tu pregunta por favor para que pueda entenderla. Prueba usando frases cortas y claras." | |
faq = ['¿Cuándo puedo inscribirme a carreras?', '¿Qué carreras tiene la UNAJ?', '¿Qué posgrados tiene la UNAJ?', '¿Qué cursos de oficios tiene la UNAJ y cómo puedo inscribirme?', '¿Qué otras propuestas de formación ofrece la UNAJ?', '¿Puedo estudiar idiomas en la UNAJ?', '¿Qué hago si no puedo ingresar al SIU GUARANÍ?', '¿Cuándo comienza y termina el cuatrimestre?', '¿Dónde encuentro el calendario académico?', '¿Cuándo puedo reincorporarme?', '¿Cuándo puedo cambiar de carrera?', '¿Cómo pido equivalencias?', '¿Cómo pido una licencia estudiantil?'] | |
for f in faq: | |
print(f) | |
print(obtener_respuesta_chatbot(corregir_preguntas(f.lower()))) | |
print("-------------------------------------------------------------------------------------------------------------------------------------") | |
# Instalar librerías necesarias | |
import gradio as gr | |
from pyngrok import ngrok | |
import pandas as pd | |
from datetime import datetime | |
from bs4 import BeautifulSoup | |
# Nombre del archivo CSV | |
feedback_file = "feedback.csv" | |
# Inicializar CSV si no existe | |
def init_csv(): | |
try: | |
pd.read_csv(feedback_file) | |
except FileNotFoundError: | |
df = pd.DataFrame(columns=["timestamp", "question", "response", "feedback"]) | |
df.to_csv(feedback_file, index=False) | |
# Extrae el texto de la respuesta HTML | |
def limpiar_html(texto_html): | |
return BeautifulSoup(texto_html, "html.parser").get_text() | |
# Guardar el feedback cuando se hace clic en "Nuevo Mensaje" | |
def guardar_feedback(question, response, feedback): | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
response_limpia = limpiar_html(response) | |
new_data = pd.DataFrame([[timestamp, question, response_limpia, feedback]], | |
columns=["timestamp", "question", "response", "feedback"]) | |
new_data.to_csv(feedback_file, mode='a', header=False, index=False) | |
return "", "", 10, gr.update(interactive=False) # limpia todo y desactiva "Nuevo Mensaje" | |
# Convierte las URL en hipervinculos clickeables | |
def convertir_urls_a_links(texto): | |
# Expresión regular para encontrar URLs | |
url_pattern = r"(https?://[^\s]+)" | |
# Reemplaza cada URL por una etiqueta <a> | |
return re.sub(url_pattern, r'<a href="\1" target="_blank">\1</a>', texto) | |
# Simulación de respuesta del chatbot (reemplazar por tu modelo real) | |
def chatbot_response(question): | |
mensaje_corregido = corregir_preguntas(question) | |
respuesta_raw = obtener_respuesta_chatbot(mensaje_corregido) | |
respuesta = convertir_urls_a_links(respuesta_raw) | |
# Agrega un contenedor con estilo para simular una caja | |
respuesta_contenedor = f""" | |
<div style='background-color:#2b2b2b; color:#f1f1f1; border:0px solid #515057; | |
padding:10px; border-radius:5px; white-space:pre-wrap'> | |
{respuesta} | |
</div> | |
""" | |
return respuesta_contenedor, gr.update(visible=True, interactive=True) | |
# Inicializamos el CSV | |
init_csv() | |
# Interfaz Gradio | |
with gr.Blocks(css=""" | |
body { | |
background-color: black; | |
color: #00ffff; | |
} | |
.gr-button { | |
background-color: #00ffff !important; | |
color: black !important; | |
} | |
.gr-textbox textarea, .gr-number input { | |
background-color: #111; | |
color: #00ffff; | |
border: 1px solid #00ffff; | |
} | |
""") as demo: | |
with gr.Row(): | |
gr.HTML("<div style='flex:1'></div><img src='https://guarani.unaj.edu.ar/_comp/unaj/img/logo-transparente.png' height='60px' style='margin:10px'/>") | |
gr.Markdown("# Chatbot UNAJ\n## Hola! Soy Arturito, el bot de la UNAJ y estoy para responder tus preguntas sobre la Universidad Nacional Arturo Jauretche") | |
question_input = gr.Textbox(label="Mensaje", placeholder="Escribí tu consulta...") | |
submit_btn = gr.Button("Enviar Mensaje") | |
# Se usa un HTML para que los links de la respuesta sean clickeables | |
response_output = gr.HTML() | |
# Se coloca un slider que permite captar el feedback de la respuesta | |
feedback_slider = gr.Slider(minimum=1, maximum=10, value=10, step=1, | |
label="¿Qué tan útil fue la respuesta? (1 = Nada útil, 10 = Muy útil)", interactive=True) | |
# Aparece un boton para "Nuevo Mensaje" que limpia el cuadro de "Mensaje" y guarda la respuesta y puntuación. | |
new_message_btn = gr.Button("Nuevo Mensaje", visible=False) | |
# Evento al hacer clic en "Enviar Mensaje" | |
submit_btn.click(fn=chatbot_response, | |
inputs=question_input, | |
outputs=[response_output, new_message_btn]) | |
# Evento al hacer clic en "Nuevo Mensaje" | |
new_message_btn.click(fn=guardar_feedback, | |
inputs=[question_input, response_output, feedback_slider], | |
outputs=[question_input, response_output, feedback_slider, new_message_btn]) | |
demo.launch(share=True) |