import os import logging from typing import Optional from pydantic import BaseModel from fastapi import FastAPI, HTTPException import rdflib from rdflib import RDF, RDFS, OWL from huggingface_hub import InferenceClient from sentence_transformers import SentenceTransformer import faiss import json import numpy as np logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("app.log"), logging.StreamHandler()] ) logger = logging.getLogger(__name__) API_KEY = os.getenv("HF_API_KEY") if not API_KEY: logger.error("HF_API_KEY non impostata.") raise EnvironmentError("HF_API_KEY non impostata.") client = InferenceClient(api_key=API_KEY) RDF_FILE = "Ontologia.rdf" HF_MODEL = "Qwen/Qwen2.5-72B-Instruct" MAX_CLASSES = 30 MAX_PROPERTIES = 30 # Carica i documenti e l'indice FAISS with open("data/documents.json", "r", encoding="utf-8") as f: documents = json.load(f) index = faiss.read_index("data/faiss.index") model = SentenceTransformer('all-MiniLM-L6-v2') def retrieve_relevant_documents(query: str, top_k: int = 5): query_embedding = model.encode([query], convert_to_numpy=True) distances, indices = index.search(query_embedding, top_k) relevant_docs = [documents[idx] for idx in indices[0]] return relevant_docs def extract_classes_and_properties(rdf_file:str) -> str: """ Carica l'ontologia e crea un 'sunto' di Classi e Proprietà (senza NamedIndividuals) per ridurre i token. """ if not os.path.exists(rdf_file): return "NO_RDF_FILE" g = rdflib.Graph() try: g.parse(rdf_file, format="xml") except Exception as e: logger.error(f"Parsing RDF error: {e}") return "PARSING_ERROR" # Troviamo le classi classes_found = set() for s in g.subjects(RDF.type, OWL.Class): classes_found.add(s) for s in g.subjects(RDF.type, RDFS.Class): classes_found.add(s) classes_list = sorted(str(c) for c in classes_found) classes_list = classes_list[:MAX_CLASSES] # Troviamo le proprietà props_found = set() for p in g.subjects(RDF.type, OWL.ObjectProperty): props_found.add(p) for p in g.subjects(RDF.type, OWL.DatatypeProperty): props_found.add(p) for p in g.subjects(RDF.type, RDF.Property): props_found.add(p) props_list = sorted(str(x) for x in props_found) props_list = props_list[:MAX_PROPERTIES] txt_classes = "\n".join([f"- CLASSE: {c}" for c in classes_list]) txt_props = "\n".join([f"- PROPRIETA': {p}" for p in props_list]) summary = f"""\ # CLASSI (max {MAX_CLASSES}) {txt_classes} # PROPRIETA' (max {MAX_PROPERTIES}) {txt_props} """ return summary knowledge_text = extract_classes_and_properties(RDF_FILE) def create_system_message(ont_text:str, retrieved_docs:str)->str: """ Prompt di sistema robusto, con regole su query in una riga e informazioni recuperate tramite RAG. """ return f""" Sei un assistente museale. Ecco un estratto di CLASSI e PROPRIETA' dell'ontologia (senza NamedIndividuals): --- ONTOLOGIA --- {ont_text} --- FINE --- Ecco alcune informazioni rilevanti recuperate dalla base di conoscenza: {retrieved_docs} Suggerimento: se l'utente chiede il 'materiale' di un'opera, potresti usare qualcosa come 'base:materialeOpera' o un'altra proprietà simile (se esiste). Non è tassativo: usa la proprietà che ritieni più affine se ci sono riferimenti in ontologia. REGOLE STRINGENTI: 1) Se l'utente chiede info su questa ontologia, genera SEMPRE una query SPARQL in UNA SOLA RIGA, con prefix: PREFIX base: 2) Se la query produce 0 risultati o fallisce, ritenta con un secondo tentativo. 3) Se la domanda è generica (tipo 'Ciao, come stai?'), rispondi breve. 4) Se trovi risultati, risposta finale = la query SPARQL (una sola riga). 5) Se non trovi nulla, di' 'Nessuna info.' 6) Non multiline. Esempio: PREFIX base: <...> SELECT ?x WHERE { ... }. FINE REGOLE """ def create_explanation_prompt(results_str:str)->str: return f""" Ho ottenuto questi risultati SPARQL: {results_str} Ora fornisci una breve spiegazione museale (massimo ~10 righe), senza inventare oltre i risultati. """ async def call_hf_model(messages, temperature=0.5, max_tokens=1024)->str: logger.debug("Chiamo HF con i seguenti messaggi:") for m in messages: logger.debug(f"ROLE={m['role']} => {m['content'][:300]}") try: resp = client.chat.completions.create( model=HF_MODEL, messages=messages, temperature=temperature, max_tokens=max_tokens, top_p=0.9 ) raw=resp["choices"][0]["message"]["content"] # Forziamo la query su linea singola se multiline single_line = " ".join(raw.splitlines()) logger.debug(f"Risposta HF single-line: {single_line}") return single_line.strip() except Exception as e: logger.error(f"HuggingFace error: {e}") raise HTTPException(status_code=500, detail=str(e)) app=FastAPI() class QueryRequest(BaseModel): message:str max_tokens:int=1024 temperature:float=0.5 @app.post("/generate-response/") async def generate_response(req:QueryRequest): user_input=req.message logger.info(f"Utente dice: {user_input}") # Recupera documenti rilevanti usando RAG relevant_docs = retrieve_relevant_documents(user_input, top_k=3) retrieved_text = "\n".join([doc['text'] for doc in relevant_docs]) sys_msg=create_system_message(knowledge_text, retrieved_text) msgs=[ {"role":"system","content":sys_msg}, {"role":"user","content":user_input} ] # Primo tentativo r1=await call_hf_model(msgs, req.temperature, req.max_tokens) logger.info(f"PRIMA RISPOSTA:\n{r1}") # Se non parte con "PREFIX base:" if not r1.startswith("PREFIX base:"): sc=f"Non hai risposto con query SPARQL su una sola riga. Riprova. Domanda: {user_input}" msgs2=[ {"role":"system","content":sys_msg}, {"role":"assistant","content":r1}, {"role":"user","content":sc} ] r2=await call_hf_model(msgs2,req.temperature,req.max_tokens) logger.info(f"SECONDA RISPOSTA:\n{r2}") if r2.startswith("PREFIX base:"): sparql_query=r2 else: return {"type":"NATURAL","response": r2} else: sparql_query=r1 # Esegui la query con rdflib g=rdflib.Graph() try: g.parse(RDF_FILE,format="xml") except Exception as e: logger.error(f"Parsing RDF error: {e}") return {"type":"ERROR","response":f"Parsing RDF error: {e}"} try: results=g.query(sparql_query) except Exception as e: fallback=f"La query SPARQL ha fallito. Riprova. Domanda: {user_input}" msgs3=[ {"role":"system","content":sys_msg}, {"role":"assistant","content":sparql_query}, {"role":"user","content":fallback} ] r3=await call_hf_model(msgs3,req.temperature,req.max_tokens) if r3.startswith("PREFIX base:"): sparql_query=r3 try: results=g.query(sparql_query) except Exception as e2: return {"type":"ERROR","response":f"Query fallita di nuovo: {e2}"} else: return {"type":"NATURAL","response":r3} if len(results)==0: return {"type":"NATURAL","sparql_query":sparql_query,"response":"Nessun risultato."} # Confeziona risultati row_list=[] for row in results: row_str=", ".join([f"{k}:{v}" for k,v in row.asdict().items()]) row_list.append(row_str) results_str="\n".join(row_list) # Spiegazione exp_prompt=create_explanation_prompt(results_str) msgs4=[ {"role":"system","content":exp_prompt}, {"role":"user","content":""} ] explanation=await call_hf_model(msgs4,req.temperature,req.max_tokens) return { "type":"NATURAL", "sparql_query":sparql_query, "sparql_results":row_list, "explanation":explanation } @app.get("/") def home(): return {"message":"Prompt lascia libertà su come chiamare la proprietà del materiale, ma suggerisce un possibile 'materialeOpera'."}