|
import os |
|
import logging |
|
from typing import Optional |
|
from pydantic import BaseModel |
|
from fastapi import FastAPI, HTTPException |
|
import rdflib |
|
from rdflib import RDF, RDFS, OWL |
|
from huggingface_hub import InferenceClient |
|
from sentence_transformers import SentenceTransformer |
|
import faiss |
|
import json |
|
import numpy as np |
|
|
|
logging.basicConfig( |
|
level=logging.DEBUG, |
|
format="%(asctime)s - %(levelname)s - %(message)s", |
|
handlers=[logging.FileHandler("app.log"), logging.StreamHandler()] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
API_KEY = os.getenv("HF_API_KEY") |
|
if not API_KEY: |
|
logger.error("HF_API_KEY non impostata.") |
|
raise EnvironmentError("HF_API_KEY non impostata.") |
|
|
|
client = InferenceClient(api_key=API_KEY) |
|
|
|
RDF_FILE = "Ontologia.rdf" |
|
HF_MODEL = "Qwen/Qwen2.5-72B-Instruct" |
|
|
|
MAX_CLASSES = 30 |
|
MAX_PROPERTIES = 30 |
|
|
|
|
|
with open("data/documents.json", "r", encoding="utf-8") as f: |
|
documents = json.load(f) |
|
index = faiss.read_index("data/faiss.index") |
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
def retrieve_relevant_documents(query: str, top_k: int = 5): |
|
query_embedding = model.encode([query], convert_to_numpy=True) |
|
distances, indices = index.search(query_embedding, top_k) |
|
relevant_docs = [documents[idx] for idx in indices[0]] |
|
return relevant_docs |
|
|
|
def extract_classes_and_properties(rdf_file:str) -> str: |
|
""" |
|
Carica l'ontologia e crea un 'sunto' di Classi e Proprietà |
|
(senza NamedIndividuals) per ridurre i token. |
|
""" |
|
if not os.path.exists(rdf_file): |
|
return "NO_RDF_FILE" |
|
|
|
g = rdflib.Graph() |
|
try: |
|
g.parse(rdf_file, format="xml") |
|
except Exception as e: |
|
logger.error(f"Parsing RDF error: {e}") |
|
return "PARSING_ERROR" |
|
|
|
|
|
classes_found = set() |
|
for s in g.subjects(RDF.type, OWL.Class): |
|
classes_found.add(s) |
|
for s in g.subjects(RDF.type, RDFS.Class): |
|
classes_found.add(s) |
|
classes_list = sorted(str(c) for c in classes_found) |
|
classes_list = classes_list[:MAX_CLASSES] |
|
|
|
|
|
props_found = set() |
|
for p in g.subjects(RDF.type, OWL.ObjectProperty): |
|
props_found.add(p) |
|
for p in g.subjects(RDF.type, OWL.DatatypeProperty): |
|
props_found.add(p) |
|
for p in g.subjects(RDF.type, RDF.Property): |
|
props_found.add(p) |
|
props_list = sorted(str(x) for x in props_found) |
|
props_list = props_list[:MAX_PROPERTIES] |
|
|
|
txt_classes = "\n".join([f"- CLASSE: {c}" for c in classes_list]) |
|
txt_props = "\n".join([f"- PROPRIETA': {p}" for p in props_list]) |
|
|
|
summary = f"""\ |
|
# CLASSI (max {MAX_CLASSES}) |
|
{txt_classes} |
|
# PROPRIETA' (max {MAX_PROPERTIES}) |
|
{txt_props} |
|
""" |
|
return summary |
|
|
|
knowledge_text = extract_classes_and_properties(RDF_FILE) |
|
|
|
def create_system_message(ont_text:str, retrieved_docs:str)->str: |
|
""" |
|
Prompt di sistema robusto, con regole su query in una riga e |
|
informazioni recuperate tramite RAG. |
|
""" |
|
return f""" |
|
Sei un assistente museale. Ecco un estratto di CLASSI e PROPRIETA' dell'ontologia (senza NamedIndividuals): |
|
--- ONTOLOGIA --- |
|
{ont_text} |
|
--- FINE --- |
|
Ecco alcune informazioni rilevanti recuperate dalla base di conoscenza: |
|
{retrieved_docs} |
|
Suggerimento: se l'utente chiede il 'materiale' di un'opera, potresti usare qualcosa come |
|
'base:materialeOpera' o un'altra proprietà simile (se esiste). Non è tassativo: usa |
|
la proprietà che ritieni più affine se ci sono riferimenti in ontologia. |
|
REGOLE STRINGENTI: |
|
1) Se l'utente chiede info su questa ontologia, genera SEMPRE una query SPARQL in UNA SOLA RIGA, |
|
con prefix: |
|
PREFIX base: <http://www.semanticweb.org/lucreziamosca/ontologies/progettoMuseo#> |
|
2) Se la query produce 0 risultati o fallisce, ritenta con un secondo tentativo. |
|
3) Se la domanda è generica (tipo 'Ciao, come stai?'), rispondi breve. |
|
4) Se trovi risultati, risposta finale = la query SPARQL (una sola riga). |
|
5) Se non trovi nulla, di' 'Nessuna info.' |
|
6) Non multiline. Esempio: PREFIX base: <...> SELECT ?x WHERE { ... }. |
|
FINE REGOLE |
|
""" |
|
|
|
def create_explanation_prompt(results_str:str)->str: |
|
return f""" |
|
Ho ottenuto questi risultati SPARQL: |
|
{results_str} |
|
Ora fornisci una breve spiegazione museale (massimo ~10 righe), senza inventare oltre i risultati. |
|
""" |
|
|
|
async def call_hf_model(messages, temperature=0.5, max_tokens=1024)->str: |
|
logger.debug("Chiamo HF con i seguenti messaggi:") |
|
for m in messages: |
|
logger.debug(f"ROLE={m['role']} => {m['content'][:300]}") |
|
try: |
|
resp = client.chat.completions.create( |
|
model=HF_MODEL, |
|
messages=messages, |
|
temperature=temperature, |
|
max_tokens=max_tokens, |
|
top_p=0.9 |
|
) |
|
raw=resp["choices"][0]["message"]["content"] |
|
|
|
single_line = " ".join(raw.splitlines()) |
|
logger.debug(f"Risposta HF single-line: {single_line}") |
|
return single_line.strip() |
|
except Exception as e: |
|
logger.error(f"HuggingFace error: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
app=FastAPI() |
|
|
|
class QueryRequest(BaseModel): |
|
message:str |
|
max_tokens:int=1024 |
|
temperature:float=0.5 |
|
|
|
@app.post("/generate-response/") |
|
async def generate_response(req:QueryRequest): |
|
user_input=req.message |
|
logger.info(f"Utente dice: {user_input}") |
|
|
|
|
|
relevant_docs = retrieve_relevant_documents(user_input, top_k=3) |
|
retrieved_text = "\n".join([doc['text'] for doc in relevant_docs]) |
|
|
|
sys_msg=create_system_message(knowledge_text, retrieved_text) |
|
msgs=[ |
|
{"role":"system","content":sys_msg}, |
|
{"role":"user","content":user_input} |
|
] |
|
|
|
r1=await call_hf_model(msgs, req.temperature, req.max_tokens) |
|
logger.info(f"PRIMA RISPOSTA:\n{r1}") |
|
|
|
|
|
if not r1.startswith("PREFIX base:"): |
|
sc=f"Non hai risposto con query SPARQL su una sola riga. Riprova. Domanda: {user_input}" |
|
msgs2=[ |
|
{"role":"system","content":sys_msg}, |
|
{"role":"assistant","content":r1}, |
|
{"role":"user","content":sc} |
|
] |
|
r2=await call_hf_model(msgs2,req.temperature,req.max_tokens) |
|
logger.info(f"SECONDA RISPOSTA:\n{r2}") |
|
if r2.startswith("PREFIX base:"): |
|
sparql_query=r2 |
|
else: |
|
return {"type":"NATURAL","response": r2} |
|
else: |
|
sparql_query=r1 |
|
|
|
|
|
g=rdflib.Graph() |
|
try: |
|
g.parse(RDF_FILE,format="xml") |
|
except Exception as e: |
|
logger.error(f"Parsing RDF error: {e}") |
|
return {"type":"ERROR","response":f"Parsing RDF error: {e}"} |
|
|
|
try: |
|
results=g.query(sparql_query) |
|
except Exception as e: |
|
fallback=f"La query SPARQL ha fallito. Riprova. Domanda: {user_input}" |
|
msgs3=[ |
|
{"role":"system","content":sys_msg}, |
|
{"role":"assistant","content":sparql_query}, |
|
{"role":"user","content":fallback} |
|
] |
|
r3=await call_hf_model(msgs3,req.temperature,req.max_tokens) |
|
if r3.startswith("PREFIX base:"): |
|
sparql_query=r3 |
|
try: |
|
results=g.query(sparql_query) |
|
except Exception as e2: |
|
return {"type":"ERROR","response":f"Query fallita di nuovo: {e2}"} |
|
else: |
|
return {"type":"NATURAL","response":r3} |
|
|
|
if len(results)==0: |
|
return {"type":"NATURAL","sparql_query":sparql_query,"response":"Nessun risultato."} |
|
|
|
|
|
row_list=[] |
|
for row in results: |
|
row_str=", ".join([f"{k}:{v}" for k,v in row.asdict().items()]) |
|
row_list.append(row_str) |
|
results_str="\n".join(row_list) |
|
|
|
|
|
exp_prompt=create_explanation_prompt(results_str) |
|
msgs4=[ |
|
{"role":"system","content":exp_prompt}, |
|
{"role":"user","content":""} |
|
] |
|
explanation=await call_hf_model(msgs4,req.temperature,req.max_tokens) |
|
|
|
return { |
|
"type":"NATURAL", |
|
"sparql_query":sparql_query, |
|
"sparql_results":row_list, |
|
"explanation":explanation |
|
} |
|
|
|
@app.get("/") |
|
def home(): |
|
return {"message":"Prompt lascia libertà su come chiamare la proprietà del materiale, ma suggerisce un possibile 'materialeOpera'."} |
|
|