IA2_model / app.py
AshenClock's picture
Update app.py
d9ab7eb verified
raw
history blame
8.38 kB
import os
import logging
from typing import Optional
from pydantic import BaseModel
from fastapi import FastAPI, HTTPException
import rdflib
from rdflib import RDF, RDFS, OWL
from huggingface_hub import InferenceClient
from sentence_transformers import SentenceTransformer
import faiss
import json
import numpy as np
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("app.log"), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
API_KEY = os.getenv("HF_API_KEY")
if not API_KEY:
logger.error("HF_API_KEY non impostata.")
raise EnvironmentError("HF_API_KEY non impostata.")
client = InferenceClient(api_key=API_KEY)
RDF_FILE = "Ontologia.rdf"
HF_MODEL = "Qwen/Qwen2.5-72B-Instruct"
MAX_CLASSES = 30
MAX_PROPERTIES = 30
# Carica i documenti e l'indice FAISS
with open("data/documents.json", "r", encoding="utf-8") as f:
documents = json.load(f)
index = faiss.read_index("data/faiss.index")
model = SentenceTransformer('all-MiniLM-L6-v2')
def retrieve_relevant_documents(query: str, top_k: int = 5):
query_embedding = model.encode([query], convert_to_numpy=True)
distances, indices = index.search(query_embedding, top_k)
relevant_docs = [documents[idx] for idx in indices[0]]
return relevant_docs
def extract_classes_and_properties(rdf_file:str) -> str:
"""
Carica l'ontologia e crea un 'sunto' di Classi e Proprietà
(senza NamedIndividuals) per ridurre i token.
"""
if not os.path.exists(rdf_file):
return "NO_RDF_FILE"
g = rdflib.Graph()
try:
g.parse(rdf_file, format="xml")
except Exception as e:
logger.error(f"Parsing RDF error: {e}")
return "PARSING_ERROR"
# Troviamo le classi
classes_found = set()
for s in g.subjects(RDF.type, OWL.Class):
classes_found.add(s)
for s in g.subjects(RDF.type, RDFS.Class):
classes_found.add(s)
classes_list = sorted(str(c) for c in classes_found)
classes_list = classes_list[:MAX_CLASSES]
# Troviamo le proprietà
props_found = set()
for p in g.subjects(RDF.type, OWL.ObjectProperty):
props_found.add(p)
for p in g.subjects(RDF.type, OWL.DatatypeProperty):
props_found.add(p)
for p in g.subjects(RDF.type, RDF.Property):
props_found.add(p)
props_list = sorted(str(x) for x in props_found)
props_list = props_list[:MAX_PROPERTIES]
txt_classes = "\n".join([f"- CLASSE: {c}" for c in classes_list])
txt_props = "\n".join([f"- PROPRIETA': {p}" for p in props_list])
summary = f"""\
# CLASSI (max {MAX_CLASSES})
{txt_classes}
# PROPRIETA' (max {MAX_PROPERTIES})
{txt_props}
"""
return summary
knowledge_text = extract_classes_and_properties(RDF_FILE)
def create_system_message(ont_text:str, retrieved_docs:str)->str:
"""
Prompt di sistema robusto, con regole su query in una riga e
informazioni recuperate tramite RAG.
"""
return f"""
Sei un assistente museale. Ecco un estratto di CLASSI e PROPRIETA' dell'ontologia (senza NamedIndividuals):
--- ONTOLOGIA ---
{ont_text}
--- FINE ---
Ecco alcune informazioni rilevanti recuperate dalla base di conoscenza:
{retrieved_docs}
Suggerimento: se l'utente chiede il 'materiale' di un'opera, potresti usare qualcosa come
'base:materialeOpera' o un'altra proprietà simile (se esiste). Non è tassativo: usa
la proprietà che ritieni più affine se ci sono riferimenti in ontologia.
REGOLE STRINGENTI:
1) Se l'utente chiede info su questa ontologia, genera SEMPRE una query SPARQL in UNA SOLA RIGA,
con prefix:
PREFIX base: <http://www.semanticweb.org/lucreziamosca/ontologies/progettoMuseo#>
2) Se la query produce 0 risultati o fallisce, ritenta con un secondo tentativo.
3) Se la domanda è generica (tipo 'Ciao, come stai?'), rispondi breve.
4) Se trovi risultati, risposta finale = la query SPARQL (una sola riga).
5) Se non trovi nulla, di' 'Nessuna info.'
6) Non multiline. Esempio: PREFIX base: <...> SELECT ?x WHERE { ... }.
FINE REGOLE
"""
def create_explanation_prompt(results_str:str)->str:
return f"""
Ho ottenuto questi risultati SPARQL:
{results_str}
Ora fornisci una breve spiegazione museale (massimo ~10 righe), senza inventare oltre i risultati.
"""
async def call_hf_model(messages, temperature=0.5, max_tokens=1024)->str:
logger.debug("Chiamo HF con i seguenti messaggi:")
for m in messages:
logger.debug(f"ROLE={m['role']} => {m['content'][:300]}")
try:
resp = client.chat.completions.create(
model=HF_MODEL,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
top_p=0.9
)
raw=resp["choices"][0]["message"]["content"]
# Forziamo la query su linea singola se multiline
single_line = " ".join(raw.splitlines())
logger.debug(f"Risposta HF single-line: {single_line}")
return single_line.strip()
except Exception as e:
logger.error(f"HuggingFace error: {e}")
raise HTTPException(status_code=500, detail=str(e))
app=FastAPI()
class QueryRequest(BaseModel):
message:str
max_tokens:int=1024
temperature:float=0.5
@app.post("/generate-response/")
async def generate_response(req:QueryRequest):
user_input=req.message
logger.info(f"Utente dice: {user_input}")
# Recupera documenti rilevanti usando RAG
relevant_docs = retrieve_relevant_documents(user_input, top_k=3)
retrieved_text = "\n".join([doc['text'] for doc in relevant_docs])
sys_msg=create_system_message(knowledge_text, retrieved_text)
msgs=[
{"role":"system","content":sys_msg},
{"role":"user","content":user_input}
]
# Primo tentativo
r1=await call_hf_model(msgs, req.temperature, req.max_tokens)
logger.info(f"PRIMA RISPOSTA:\n{r1}")
# Se non parte con "PREFIX base:"
if not r1.startswith("PREFIX base:"):
sc=f"Non hai risposto con query SPARQL su una sola riga. Riprova. Domanda: {user_input}"
msgs2=[
{"role":"system","content":sys_msg},
{"role":"assistant","content":r1},
{"role":"user","content":sc}
]
r2=await call_hf_model(msgs2,req.temperature,req.max_tokens)
logger.info(f"SECONDA RISPOSTA:\n{r2}")
if r2.startswith("PREFIX base:"):
sparql_query=r2
else:
return {"type":"NATURAL","response": r2}
else:
sparql_query=r1
# Esegui la query con rdflib
g=rdflib.Graph()
try:
g.parse(RDF_FILE,format="xml")
except Exception as e:
logger.error(f"Parsing RDF error: {e}")
return {"type":"ERROR","response":f"Parsing RDF error: {e}"}
try:
results=g.query(sparql_query)
except Exception as e:
fallback=f"La query SPARQL ha fallito. Riprova. Domanda: {user_input}"
msgs3=[
{"role":"system","content":sys_msg},
{"role":"assistant","content":sparql_query},
{"role":"user","content":fallback}
]
r3=await call_hf_model(msgs3,req.temperature,req.max_tokens)
if r3.startswith("PREFIX base:"):
sparql_query=r3
try:
results=g.query(sparql_query)
except Exception as e2:
return {"type":"ERROR","response":f"Query fallita di nuovo: {e2}"}
else:
return {"type":"NATURAL","response":r3}
if len(results)==0:
return {"type":"NATURAL","sparql_query":sparql_query,"response":"Nessun risultato."}
# Confeziona risultati
row_list=[]
for row in results:
row_str=", ".join([f"{k}:{v}" for k,v in row.asdict().items()])
row_list.append(row_str)
results_str="\n".join(row_list)
# Spiegazione
exp_prompt=create_explanation_prompt(results_str)
msgs4=[
{"role":"system","content":exp_prompt},
{"role":"user","content":""}
]
explanation=await call_hf_model(msgs4,req.temperature,req.max_tokens)
return {
"type":"NATURAL",
"sparql_query":sparql_query,
"sparql_results":row_list,
"explanation":explanation
}
@app.get("/")
def home():
return {"message":"Prompt lascia libertà su come chiamare la proprietà del materiale, ma suggerisce un possibile 'materialeOpera'."}