Spaces:
Running
Running
#/modules/database/semantic_mongo_db.py | |
#/modules/database/semantic_mongo_db.py | |
from .mongo_db import insert_document, find_documents, update_document, delete_document | |
from datetime import datetime, timezone | |
import logging | |
logger = logging.getLogger(__name__) | |
COLLECTION_NAME = 'student_semantic_analysis' | |
def store_student_semantic_result(username, text, analysis_result): | |
""" | |
Guarda el resultado del análisis semántico en MongoDB. | |
Args: | |
username: Nombre del usuario | |
text: Texto analizado | |
analysis_result: Resultado del análisis que incluye: | |
- key_concepts: Lista de conceptos clave y sus frecuencias | |
- concept_graph: Gráfico de relaciones conceptuales en bytes | |
- entities: Diccionario de entidades identificadas | |
- entity_graph: Gráfico de relaciones entre entidades en bytes | |
""" | |
try: | |
analysis_document = { | |
'username': username, | |
'timestamp': datetime.now(timezone.utc).isoformat(), | |
'text': text, | |
'analysis_type': 'semantic', | |
'key_concepts': analysis_result['key_concepts'], | |
'concept_graph': analysis_result['concept_graph'], | |
'entities': analysis_result['entities'], | |
'entity_graph': analysis_result['entity_graph'] | |
} | |
result = insert_document(COLLECTION_NAME, analysis_document) | |
if result: | |
logger.info(f"Análisis semántico guardado con ID: {result} para el usuario: {username}") | |
return True | |
return False | |
except Exception as e: | |
logger.error(f"Error al guardar el análisis semántico: {str(e)}") | |
return False | |
def get_student_semantic_analysis(username, limit=10): | |
""" | |
Recupera los análisis semánticos de un estudiante. | |
Args: | |
username: Nombre del usuario | |
limit: Número máximo de análisis a retornar | |
Returns: | |
list: Lista de análisis semánticos | |
""" | |
query = {"username": username, "analysis_type": "semantic"} | |
return find_documents(COLLECTION_NAME, query, sort=[("timestamp", -1)], limit=limit) | |
def update_student_semantic_analysis(analysis_id, update_data): | |
""" | |
Actualiza un análisis semántico existente. | |
Args: | |
analysis_id: ID del análisis a actualizar | |
update_data: Datos a actualizar | |
""" | |
query = {"_id": analysis_id} | |
update = {"$set": update_data} | |
return update_document(COLLECTION_NAME, query, update) | |
def delete_student_semantic_analysis(analysis_id): | |
""" | |
Elimina un análisis semántico. | |
Args: | |
analysis_id: ID del análisis a eliminar | |
""" | |
query = {"_id": analysis_id} | |
return delete_document(COLLECTION_NAME, query) | |
def get_student_semantic_data(username): | |
""" | |
Obtiene todos los análisis semánticos de un estudiante. | |
Args: | |
username: Nombre del usuario | |
Returns: | |
dict: Diccionario con todos los análisis del estudiante | |
""" | |
analyses = get_student_semantic_analysis(username, limit=None) | |
formatted_analyses = [] | |
for analysis in analyses: | |
formatted_analysis = { | |
'timestamp': analysis['timestamp'], | |
'text': analysis['text'], | |
'key_concepts': analysis['key_concepts'], | |
'entities': analysis['entities'] | |
# No incluimos los gráficos en el resumen general | |
} | |
formatted_analyses.append(formatted_analysis) | |
return { | |
'entries': formatted_analyses | |
} | |
# Exportar las funciones necesarias | |
__all__ = [ | |
'store_student_semantic_result', | |
'get_student_semantic_analysis', | |
'update_student_semantic_analysis', | |
'delete_student_semantic_analysis', | |
'get_student_semantic_data' | |
] |