# -*- coding: utf-8 -*- """Copia de Modelo_Chatbot_Final_3_con_Gradio.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1sFAltehLtdNpHoQVsiDeikgSJkIDTWrD """ import warnings warnings.filterwarnings('ignore') import json import numpy as np import pandas as pd import random from matplotlib import pyplot as plt import seaborn as sns from wordcloud import WordCloud,STOPWORDS import missingno as msno from sklearn.feature_extraction.text import CountVectorizer from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, precision_recall_fscore_support #from keras.preprocessing import text import keras from keras.models import Sequential from keras.layers import Dense,Embedding,LSTM,Dropout from keras.callbacks import ReduceLROnPlateau from tensorflow.keras.preprocessing.sequence import pad_sequences import nltk from nltk import word_tokenize from nltk.stem import PorterStemmer import torch from torch.utils.data import Dataset from transformers import AutoTokenizer, TFAutoModelForSequenceClassification from transformers import pipeline from transformers import DistilBertTokenizerFast from transformers import BertForSequenceClassification, BertTokenizerFast from transformers import TFDistilBertForSequenceClassification, Trainer, TFTrainingArguments from transformers import BertTokenizer, TFBertForSequenceClassification, BertConfig from transformers import TrainingArguments, Trainer, EarlyStoppingCallback import re import language_tool_python import logging import spacy def load_json_file(filename): with open(filename) as f: file = json.load(f) return file filename = 'intents_aumentado.json' intents = load_json_file(filename) def create_df(): df = pd.DataFrame({ 'Pattern' : [], 'Tag' : [] }) return df df = create_df() df def extract_json_info(json_file, df): for intent in json_file['intents']: for pattern in intent['patterns']: sentence_tag = [pattern, intent['tag']] df.loc[len(df.index)] = sentence_tag return df df = extract_json_info(intents, df) df.head() df2 = df.copy() df2.head() import nltk nltk.download('punkt_tab') stemmer = PorterStemmer() ignore_words=['?', '!', ',', '.'] def preprocess_pattern(pattern): words = word_tokenize(pattern.lower()) stemmed_words = [stemmer.stem(word) for word in words if word not in ignore_words] return " ".join(stemmed_words) df['Pattern'] = df['Pattern'].apply(preprocess_pattern) df2.head() labels = df2['Tag'].unique().tolist() labels = [s.strip() for s in labels] labels num_labels = len(labels) id2label = {id:label for id, label in enumerate(labels)} label2id = {label:id for id, label in enumerate(labels)} id2label label2id df2['labels'] = df2['Tag'].map(lambda x: label2id[x.strip()]) df2.head() X = list(df2['Pattern']) X[:5] y = list(df2['labels']) y[:5] X_train,X_test,y_train,y_test = train_test_split(X,y,random_state = 123) model_name = "dccuchile/bert-base-spanish-wwm-cased" max_len = 256 tokenizer = BertTokenizer.from_pretrained(model_name, max_length=max_len) model = BertForSequenceClassification.from_pretrained(model_name, num_labels=num_labels, id2label=id2label, label2id = label2id) train_encoding = tokenizer(X_train, truncation=True, padding=True) test_encoding = tokenizer(X_test, truncation=True, padding=True) full_data = tokenizer(X, truncation=True, padding=True) class DataLoader(Dataset): def __init__(self, encodings, labels): self.encodings = encodings self.labels = labels def __getitem__(self, idx): item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} item['labels'] = torch.tensor(self.labels[idx]) return item def __len__(self): return len(self.labels) train_dataloader = DataLoader(train_encoding, y_train) test_dataloader = DataLoader(test_encoding, y_test) fullDataLoader = DataLoader(full_data, y_test) def compute_metrics(pred): labels = pred.label_ids preds = pred.predictions.argmax(-1) precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro') acc = accuracy_score(labels, preds) return { 'accuracy': acc, 'f1': f1, 'precision': precision, 'recall': recall } # Parametros finales del modelo training_args = TrainingArguments( output_dir='./output', # Carpeta donde se guardarán los modelos entrenados y checkpoints do_train=True, do_eval=True, num_train_epochs=30, # Número total de épocas (pasadas completas por el dataset de entrenamiento) per_device_train_batch_size=8, # Tamaño del batch de entrenamiento por dispositivo (GPU o CPU) per_device_eval_batch_size=32, # Tamaño del batch de evaluación por dispositivo (mayor para evaluar más rápido) gradient_accumulation_steps=4, # Acumula gradientes por 4 pasos antes de hacer una actualización (simula batch más grande) learning_rate=2e-5, # Tasa de aprendizaje inicial warmup_ratio=0.1, # Porcentaje de pasos de calentamiento (warmup) sobre el total de pasos de entrenamiento weight_decay=0.1, # Regularización para evitar overfitting penalizando grandes pesos lr_scheduler_type="cosine", # Tipo de scheduler para modificar la tasa de aprendizaje (coseno en este caso) #fp16=True, # Usa precisión mixta (float16) para acelerar entrenamiento si hay soporte (ej. en GPUs) evaluation_strategy="steps", eval_steps=50, # Evalúa el modelo cada 50 pasos de entrenamiento save_strategy="steps", save_steps=50, # Guarda el modelo cada 50 pasos save_total_limit=3, # Mantiene solo los últimos 3 checkpoints, borra los anteriores logging_strategy="steps", logging_dir='./multi-class-logs', # Carpeta donde se guardarán los logs de entrenamiento cada 50 pasos logging_steps=50, # load_best_model_at_end=True, # Carga automáticamente el mejor modelo evaluado al finalizar el entrenamiento metric_for_best_model="f1", # Métrica que se usa para definir cuál fue el "mejor" modelo greater_is_better=True ) trainer = Trainer( model=model, args=training_args, train_dataset=train_dataloader, eval_dataset=test_dataloader, compute_metrics=compute_metrics, callbacks=[EarlyStoppingCallback(early_stopping_patience=3)] # Frena el entrenamiento si no mejora la métrica de evaluación después de 3 evaluaciones consecutiva ) import os os.environ["WANDB_API_KEY"] = "4fb9dff0336a34ab812e86f91b6c3a877cb25b36" trainer.train() q=[trainer.evaluate(eval_dataset=df2) for df2 in [train_dataloader, test_dataloader]] pd.DataFrame(q, index=["train","test"]).iloc[:,:5] def predict(text): inputs = tokenizer(text, padding=True, truncation=True, max_length=512, return_tensors="pt").to("cuda") outputs = model(**inputs) probs = outputs[0].softmax(1) pred_label_idx = probs.argmax() pred_label = model.config.id2label[pred_label_idx.item()] return probs, pred_label_idx, pred_label text = "Hola" predict(text) model_path = "chatbot" trainer.save_model(model_path) tokenizer.save_pretrained(model_path) model_path = "/content/chatbot" model = BertForSequenceClassification.from_pretrained(model_path) tokenizer= BertTokenizerFast.from_pretrained(model_path) chatbot= pipeline("text-classification", model=model, tokenizer=tokenizer) chatbot("Hola") negaciones = [ "no", "nunca", "nadie", "ningún", "ninguna", "nada", "jamás", "jamas", "ni", "tampoco", "de ninguna manera", "en absoluto", "en ningún caso", "no es cierto", "no estoy de acuerdo", "no me parece", "no creo", "no quiero", "no puedo", "no quiero hacerlo", "no acepto", "no gracias" ] afirmaciones = [ "sí", "si", "claro", "por supuesto", "entendido", "estoy de acuerdo", "acepto", "exacto", "correcto", "eso es", "está bien", "esta bien", "claro que sí", "lo creo", "es cierto", "sin duda", "así es", "claro que si" "perfecto", "me parece bien", "seguro", "definitivamente", "por supuesto" ] # Asumiendo que intents ya está definido def obtener_respuesta_aleatoria(tag): """Busca el intent correspondiente al tag y devuelve una respuesta aleatoria.""" for intent in intents['intents']: if intent['tag'] == tag: return random.choice(intent['responses']) return "No tengo respuesta para eso." # Asumiendo que intents ya está definido def obtener_lista_de_respuesta(tag): """Busca el intent correspondiente al tag y devuelve una respuesta aleatoria.""" for intent in intents['intents']: if intent['tag'] == tag: return intent['responses'] return "No tengo respuesta para eso." historial_respuestas = {} def obtener_respuesta_sin_repetir(label): global historial_respuestas # Si es la primera vez que se usa el label, inicializar historial if label not in historial_respuestas: historial_respuestas[label] = [] respuestas_posibles = obtener_lista_de_respuesta(label) # Lista de respuestas disponibles para el tag # Filtrar respuestas que aún no se usaron respuestas_disponibles = [r for r in respuestas_posibles if r not in historial_respuestas[label]] if not respuestas_disponibles: historial_respuestas[label] = [] # Resetear historial cuando se agoten todas # Elegir una nueva respuesta sin repetir nueva_respuesta = random.choice([r for r in respuestas_posibles if r not in historial_respuestas[label]]) # Agregarla al historial historial_respuestas[label].append(nueva_respuesta) return nueva_respuesta def corregir_preguntas(texto, idioma='es'): """ Corrige la ortografía y gramática del texto usando LanguageTool. También ajusta signos de interrogación y acentos en palabras interrogativas. Ignora correcciones sobre las palabras: 'unaj', 'arturo', 'jauretche'. """ try: # Agregar "?" si el texto tiene 3 o más palabras y no termina con "?" if len(texto.split()) >= 3 and not texto.endswith("?"): texto += "?" # Palabras a ignorar (en minúsculas) palabras_ignorar = ["unaj", "arturo", "jauretche", "profode"] reemplazos = {} # Reemplazar palabras ignoradas por marcadores temporales def reemplazar_ignoradas(match): palabra = match.group(0) marcador = f"__IGNORAR_{len(reemplazos)}__" reemplazos[marcador] = palabra return marcador patron_ignorar = re.compile(r'\b(' + '|'.join(palabras_ignorar) + r')\b', re.IGNORECASE) texto_temporal = patron_ignorar.sub(reemplazar_ignoradas, texto) # Inicializar el corrector de LanguageTool tool = language_tool_python.LanguageToolPublicAPI(idioma) # Obtener las correcciones sugeridas texto_corregido = tool.correct(texto_temporal) # Restaurar las palabras ignoradas for marcador, palabra_original in reemplazos.items(): texto_corregido = texto_corregido.replace(marcador, palabra_original) # Diccionario con palabras interrogativas y sus versiones acentuadas palabras_interrogativas = { "como": "cómo", "cuando": "cuándo", "donde": "dónde", "que": "qué", "quien": "quién", "cual": "cuál", "cuanto": "cuánto" } # Si la oración es interrogativa, corregir solo la primera palabra interrogativa if texto_corregido.endswith("?"): palabras = texto_corregido[:-1].split() # Remover "?" y dividir en palabras primera_encontrada = False for i, palabra in enumerate(palabras): palabra_limpia = palabra.lower().strip("¿") # Remover el signo "¿" si existe # Si es una palabra interrogativa y es la primera encontrada, corregir if palabra_limpia in palabras_interrogativas: if not primera_encontrada: palabras[i] = palabras_interrogativas[palabra_limpia] primera_encontrada = True texto_corregido = " ".join(palabras) + "?" # Asegurar que la oración comienza con "¿" if not texto_corregido.startswith("¿"): texto_corregido = "¿" + texto_corregido # Mantener solo el último signo "¿" y eliminar los anteriores if texto_corregido.count("¿") > 1: ultima_pos = texto_corregido.rfind("¿") texto_corregido = texto_corregido[:ultima_pos].replace("¿", "") + texto_corregido[ultima_pos:] return texto_corregido except Exception: return texto def normalizar_clave(texto): reemplazos = { "á": "a", "é": "e", "í": "i", "ó": "o", "ú": "u", "ä": "a", "ë": "e", "ï": "i", "ö": "o", "ü": "u" } for acentuada, normal in reemplazos.items(): texto = texto.replace(acentuada, normal) return texto.strip().replace(" ", "+") estado_chatbot = { "esperando_confirmacion": False, "opciones": [], "texto_original": "" } def obtener_respuesta_chatbot(text): global estado_chatbot # Si no está esperando confirmación, resetea su estado antes de procesar la nueva consulta if not estado_chatbot["esperando_confirmacion"]: estado_chatbot["opciones"] = [] # Limpia opciones anteriores estado_chatbot["texto_original"] = "" # Resetea el estado previo if estado_chatbot["esperando_confirmacion"]: if text.lower() in afirmaciones: estado_chatbot["esperando_confirmacion"] = False # Se resetea el estado respuesta = obtener_respuesta_sin_repetir(estado_chatbot["opciones"][0]["label"]) estado_chatbot["opciones"] = [] # Limpia opciones anteriores estado_chatbot["texto_original"] = "" # Resetea el estado previo return respuesta elif text.lower() in negaciones: if len(estado_chatbot["opciones"]) > 1: estado_chatbot["esperando_confirmacion"] = False # Se resetea el estado return obtener_respuesta_sin_repetir(estado_chatbot["opciones"][1]["label"]) else: estado_chatbot["esperando_confirmacion"] = False # Se resetea el estado return "No tengo más opciones, ¿podés reformular la pregunta?" else: return "Por favor, respondé 'sí' o 'no'." prediction = chatbot(text) prediction = sorted(prediction, key=lambda x: x["score"], reverse=True) # Ordenar por score label_principal = prediction[0]["label"] score_principal = prediction[0]["score"] if score_principal >= 0.1: print("(Grado de seguridad en la respuesta: ", score_principal, ")") return obtener_respuesta_sin_repetir(label_principal) elif 0.20 <= score_principal < 0.4: estado_chatbot["esperando_confirmacion"] = True estado_chatbot["opciones"] = prediction[:2] estado_chatbot["texto_original"] = text opciones_texto = ", ".join( [f"{alt['label']} ({alt['score']:.2f})" for alt in estado_chatbot["opciones"]] ) return f"No estoy seguro de la respuesta correscta. ¿Te referís a alguna de estas opciones? Opción 1: {opciones_texto} (Si/No)" else: print(f"No tengo una respuesta precisa. ¿Puedes decirme una palabra clave de tu pregunta para que pueda ayudarte? Ingresa una o dos palabras:") clave = "info" clave = normalizar_clave(clave) # Verificar si la palabra clave no es una negación if clave not in negaciones: # Si no es una negación, proporcionar el enlace return f"Prueba consultando el siguiente enlace: https://www.unaj.edu.ar/?s={clave} o reformula tu pregunta. Escribela a continuación:" else: # Si la clave es una negación, podrías manejarlo aquí return "Comprendo, intenta reformular tu pregunta por favor para que pueda entenderla. Prueba usando frases cortas y claras." faq = ['¿Cuándo puedo inscribirme a carreras?', '¿Qué carreras tiene la UNAJ?', '¿Qué posgrados tiene la UNAJ?', '¿Qué cursos de oficios tiene la UNAJ y cómo puedo inscribirme?', '¿Qué otras propuestas de formación ofrece la UNAJ?', '¿Puedo estudiar idiomas en la UNAJ?', '¿Qué hago si no puedo ingresar al SIU GUARANÍ?', '¿Cuándo comienza y termina el cuatrimestre?', '¿Dónde encuentro el calendario académico?', '¿Cuándo puedo reincorporarme?', '¿Cuándo puedo cambiar de carrera?', '¿Cómo pido equivalencias?', '¿Cómo pido una licencia estudiantil?'] for f in faq: print(f) print(obtener_respuesta_chatbot(corregir_preguntas(f.lower()))) print("-------------------------------------------------------------------------------------------------------------------------------------") # Instalar librerías necesarias import gradio as gr from pyngrok import ngrok import pandas as pd from datetime import datetime from bs4 import BeautifulSoup # Nombre del archivo CSV feedback_file = "feedback.csv" # Inicializar CSV si no existe def init_csv(): try: pd.read_csv(feedback_file) except FileNotFoundError: df = pd.DataFrame(columns=["timestamp", "question", "response", "feedback"]) df.to_csv(feedback_file, index=False) # Extrae el texto de la respuesta HTML def limpiar_html(texto_html): return BeautifulSoup(texto_html, "html.parser").get_text() # Guardar el feedback cuando se hace clic en "Nuevo Mensaje" def guardar_feedback(question, response, feedback): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") response_limpia = limpiar_html(response) new_data = pd.DataFrame([[timestamp, question, response_limpia, feedback]], columns=["timestamp", "question", "response", "feedback"]) new_data.to_csv(feedback_file, mode='a', header=False, index=False) return "", "", 10, gr.update(interactive=False) # limpia todo y desactiva "Nuevo Mensaje" # Convierte las URL en hipervinculos clickeables def convertir_urls_a_links(texto): # Expresión regular para encontrar URLs url_pattern = r"(https?://[^\s]+)" # Reemplaza cada URL por una etiqueta return re.sub(url_pattern, r'\1', texto) # Simulación de respuesta del chatbot (reemplazar por tu modelo real) def chatbot_response(question): mensaje_corregido = corregir_preguntas(question) respuesta_raw = obtener_respuesta_chatbot(mensaje_corregido) respuesta = convertir_urls_a_links(respuesta_raw) # Agrega un contenedor con estilo para simular una caja respuesta_contenedor = f"""
{respuesta}
""" return respuesta_contenedor, gr.update(visible=True, interactive=True) # Inicializamos el CSV init_csv() # Interfaz Gradio with gr.Blocks(css=""" body { background-color: black; color: #00ffff; } .gr-button { background-color: #00ffff !important; color: black !important; } .gr-textbox textarea, .gr-number input { background-color: #111; color: #00ffff; border: 1px solid #00ffff; } """) as demo: with gr.Row(): gr.HTML("
") gr.Markdown("# Chatbot UNAJ\n## Hola! Soy Arturito, el bot de la UNAJ y estoy para responder tus preguntas sobre la Universidad Nacional Arturo Jauretche") question_input = gr.Textbox(label="Mensaje", placeholder="Escribí tu consulta...") submit_btn = gr.Button("Enviar Mensaje") # Se usa un HTML para que los links de la respuesta sean clickeables response_output = gr.HTML() # Se coloca un slider que permite captar el feedback de la respuesta feedback_slider = gr.Slider(minimum=1, maximum=10, value=10, step=1, label="¿Qué tan útil fue la respuesta? (1 = Nada útil, 10 = Muy útil)", interactive=True) # Aparece un boton para "Nuevo Mensaje" que limpia el cuadro de "Mensaje" y guarda la respuesta y puntuación. new_message_btn = gr.Button("Nuevo Mensaje", visible=False) # Evento al hacer clic en "Enviar Mensaje" submit_btn.click(fn=chatbot_response, inputs=question_input, outputs=[response_output, new_message_btn]) # Evento al hacer clic en "Nuevo Mensaje" new_message_btn.click(fn=guardar_feedback, inputs=[question_input, response_output, feedback_slider], outputs=[question_input, response_output, feedback_slider, new_message_btn]) demo.launch(share=True)