Ontology-RAG-Demo / src /ontology_manager.py
AD2000X's picture
Update src/ontology_manager.py
e790587 verified
# src/ontology_manager.py
import json
import networkx as nx
from typing import Dict, List, Any, Optional, Union, Set
class OntologyManager:
"""
Manages the ontology model and provides methods for querying and navigating
the ontological structure.
"""
def __init__(self, ontology_path: str):
"""
Initialize the ontology manager with a path to the ontology JSON file.
Args:
ontology_path: Path to the JSON file containing the ontology model
"""
self.ontology_path = ontology_path
self.ontology_data = self._load_ontology()
self.graph = nx.MultiDiGraph()
self._build_graph()
def _load_ontology(self) -> Dict:
"""Load the ontology from the JSON file."""
with open(self.ontology_path, 'r') as f:
return json.load(f)
def _build_graph(self):
"""Build the ontology graph from the JSON data."""
# Add classes
for class_id, class_data in self.ontology_data["classes"].items():
self.graph.add_node(
class_id,
type="class",
description=class_data.get("description", ""),
properties=class_data.get("properties", [])
)
# Handle subclass relations
if "subClassOf" in class_data:
parent = class_data["subClassOf"]
self.graph.add_edge(class_id, parent, type="subClassOf")
# Add relationships (schema-level only, no edge added yet)
for rel in self.ontology_data.get("relationships", []):
pass # schema relationships are used for metadata, not edges
# Add instances
for instance in self.ontology_data.get("instances", []):
instance_id = instance["id"]
class_type = instance["type"]
properties = instance.get("properties", {})
# Add the instance node
self.graph.add_node(
instance_id,
type="instance",
class_type=class_type,
properties=properties
)
# Link instance to its class
self.graph.add_edge(instance_id, class_type, type="instanceOf")
# Add relationship edges if any
for rel in instance.get("relationships", []):
target = rel.get("target")
rel_type = rel.get("type")
if target and rel_type:
self.graph.add_edge(instance_id, target, type=rel_type)
def get_classes(self) -> List[str]:
"""Return a list of all class names in the ontology."""
return list(self.ontology_data["classes"].keys())
def get_class_hierarchy(self) -> Dict[str, List[str]]:
"""Return a dictionary mapping each class to its subclasses."""
hierarchy = {}
for class_id in self.get_classes():
hierarchy[class_id] = []
for class_id, class_data in self.ontology_data["classes"].items():
if "subClassOf" in class_data:
parent = class_data["subClassOf"]
if parent in hierarchy:
hierarchy[parent].append(class_id)
return hierarchy
def get_instances_of_class(self, class_name: str, include_subclasses: bool = True) -> List[str]:
"""
Get all instances of a given class.
Args:
class_name: The name of the class
include_subclasses: Whether to include instances of subclasses
Returns:
A list of instance IDs
"""
if include_subclasses:
# Get all subclasses recursively
subclasses = set(self._get_all_subclasses(class_name))
subclasses.add(class_name)
# Get instances of all classes
instances = []
for class_id in subclasses:
instances.extend([
n for n, attr in self.graph.nodes(data=True)
if attr.get("type") == "instance" and attr.get("class_type") == class_id
])
return instances
else:
# Just get direct instances
return [
n for n, attr in self.graph.nodes(data=True)
if attr.get("type") == "instance" and attr.get("class_type") == class_name
]
def _get_all_subclasses(self, class_name: str) -> List[str]:
"""Recursively get all subclasses of a given class."""
subclasses = []
direct_subclasses = [
src for src, dst, data in self.graph.edges(data=True)
if dst == class_name and data.get("type") == "subClassOf"
]
for subclass in direct_subclasses:
subclasses.append(subclass)
subclasses.extend(self._get_all_subclasses(subclass))
return subclasses
def get_relationships(self, entity_id: str, relationship_type: Optional[str] = None) -> List[Dict]:
"""
Get all relationships for a given entity, optionally filtered by type.
Args:
entity_id: The ID of the entity
relationship_type: Optional relationship type to filter by
Returns:
A list of dictionaries containing relationship information
"""
relationships = []
# Look at outgoing edges
for _, target, data in self.graph.out_edges(entity_id, data=True):
rel_type = data.get("type")
if rel_type != "instanceOf" and rel_type != "subClassOf":
if relationship_type is None or rel_type == relationship_type:
relationships.append({
"type": rel_type,
"target": target,
"direction": "outgoing"
})
# Look at incoming edges
for source, _, data in self.graph.in_edges(entity_id, data=True):
rel_type = data.get("type")
if rel_type != "instanceOf" and rel_type != "subClassOf":
if relationship_type is None or rel_type == relationship_type:
relationships.append({
"type": rel_type,
"source": source,
"direction": "incoming"
})
return relationships
def find_paths(self, source_id: str, target_id: str, max_length: int = 3) -> List[List[Dict]]:
"""
Find all paths between two entities up to a maximum length.
Args:
source_id: Starting entity ID
target_id: Target entity ID
max_length: Maximum path length
Returns:
A list of paths, where each path is a list of relationship dictionaries
"""
paths = []
# Use networkx to find simple paths
simple_paths = nx.all_simple_paths(self.graph, source_id, target_id, cutoff=max_length)
for path in simple_paths:
path_with_edges = []
for i in range(len(path) - 1):
source = path[i]
target = path[i + 1]
# There may be multiple edges between nodes
edges = self.graph.get_edge_data(source, target)
if edges:
for key, data in edges.items():
path_with_edges.append({
"source": source,
"target": target,
"type": data.get("type", "unknown")
})
paths.append(path_with_edges)
return paths
def get_entity_info(self, entity_id: str) -> Dict:
"""
Get detailed information about an entity.
Args:
entity_id: The ID of the entity
Returns:
A dictionary with entity information
"""
if entity_id not in self.graph:
return {}
node_data = self.graph.nodes[entity_id]
entity_type = node_data.get("type")
if entity_type == "instance":
# Get class information
class_type = node_data.get("class_type")
class_info = self.ontology_data["classes"].get(class_type, {})
return {
"id": entity_id,
"type": entity_type,
"class": class_type,
"class_description": class_info.get("description", ""),
"properties": node_data.get("properties", {}),
"relationships": self.get_relationships(entity_id)
}
elif entity_type == "class":
return {
"id": entity_id,
"type": entity_type,
"description": node_data.get("description", ""),
"properties": node_data.get("properties", []),
"subclasses": self._get_all_subclasses(entity_id),
"instances": self.get_instances_of_class(entity_id)
}
return node_data
def get_text_representation(self) -> str:
"""
Generate a text representation of the ontology for embedding.
Returns:
A string containing the textual representation of the ontology
"""
text_chunks = []
# Class definitions
for class_id, class_data in self.ontology_data["classes"].items():
chunk = f"Class: {class_id}\n"
chunk += f"Description: {class_data.get('description', '')}\n"
if "subClassOf" in class_data:
chunk += f"{class_id} is a subclass of {class_data['subClassOf']}.\n"
if "properties" in class_data:
chunk += f"{class_id} has properties: {', '.join(class_data['properties'])}.\n"
text_chunks.append(chunk)
# Relationship definitions
for rel in self.ontology_data["relationships"]:
chunk = f"Relationship: {rel['name']}\n"
chunk += f"Domain: {rel['domain']}, Range: {rel['range']}\n"
chunk += f"Description: {rel.get('description', '')}\n"
chunk += f"Cardinality: {rel.get('cardinality', 'many-to-many')}\n"
if "inverse" in rel:
chunk += f"The inverse relationship is {rel['inverse']}.\n"
text_chunks.append(chunk)
# Rules
for rule in self.ontology_data.get("rules", []):
chunk = f"Rule: {rule.get('id', '')}\n"
chunk += f"Description: {rule.get('description', '')}\n"
text_chunks.append(chunk)
# Instance data
for instance in self.ontology_data["instances"]:
chunk = f"Instance: {instance['id']}\n"
chunk += f"Type: {instance['type']}\n"
# Properties
if "properties" in instance:
props = []
for key, value in instance["properties"].items():
if isinstance(value, list):
props.append(f"{key}: {', '.join(str(v) for v in value)}")
else:
props.append(f"{key}: {value}")
if props:
chunk += "Properties:\n- " + "\n- ".join(props) + "\n"
# Relationships
if "relationships" in instance:
rels = []
for rel in instance["relationships"]:
rels.append(f"{rel['type']} {rel['target']}")
if rels:
chunk += "Relationships:\n- " + "\n- ".join(rels) + "\n"
text_chunks.append(chunk)
return "\n\n".join(text_chunks)
def query_by_relationship(self, source_type: str, relationship: str, target_type: str) -> List[Dict]:
"""
Query for instances connected by a specific relationship.
Args:
source_type: Type of the source entity
relationship: Type of relationship
target_type: Type of the target entity
Returns:
A list of matching relationship dictionaries
"""
results = []
# Get all instances of the source type
source_instances = self.get_instances_of_class(source_type)
for source_id in source_instances:
# Get relationships of the specified type
relationships = self.get_relationships(source_id, relationship)
for rel in relationships:
if rel["direction"] == "outgoing" and "target" in rel:
target_id = rel["target"]
target_data = self.graph.nodes[target_id]
# Check if the target is of the right type
if (target_data.get("type") == "instance" and
target_data.get("class_type") == target_type):
results.append({
"source": source_id,
"source_properties": self.graph.nodes[source_id].get("properties", {}),
"relationship": relationship,
"target": target_id,
"target_properties": target_data.get("properties", {})
})
return results
def get_semantic_context(self, query: str) -> List[str]:
"""
Retrieve relevant semantic context from the ontology based on a query.
This method identifies entities and relationships mentioned in the query
and returns contextual information about them from the ontology.
Args:
query: The query string to analyze
Returns:
A list of text chunks providing relevant ontological context
"""
# This is a simple implementation - a more sophisticated one would use
# entity recognition and semantic parsing
query_lower = query.lower()
context_chunks = []
# Check for class mentions
for class_id in self.get_classes():
if class_id.lower() in query_lower:
# Add class information
class_data = self.ontology_data["classes"][class_id]
chunk = f"Class {class_id}: {class_data.get('description', '')}\n"
# Add subclass information
if "subClassOf" in class_data:
parent = class_data["subClassOf"]
chunk += f"{class_id} is a subclass of {parent}.\n"
# Add property information
if "properties" in class_data:
chunk += f"{class_id} has properties: {', '.join(class_data['properties'])}.\n"
context_chunks.append(chunk)
# Also add some instance examples
instances = self.get_instances_of_class(class_id, include_subclasses=False)[:3]
if instances:
instance_chunk = f"Examples of {class_id}:\n"
for inst_id in instances:
props = self.graph.nodes[inst_id].get("properties", {})
if "name" in props:
instance_chunk += f"- {inst_id} ({props['name']})\n"
else:
instance_chunk += f"- {inst_id}\n"
context_chunks.append(instance_chunk)
# Check for relationship mentions
for rel in self.ontology_data["relationships"]:
if rel["name"].lower() in query_lower:
chunk = f"Relationship {rel['name']}: {rel.get('description', '')}\n"
chunk += f"This relationship connects {rel['domain']} to {rel['range']}.\n"
# Add examples
examples = self.query_by_relationship(rel['domain'], rel['name'], rel['range'])[:3]
if examples:
chunk += "Examples:\n"
for ex in examples:
source_props = ex["source_properties"]
target_props = ex["target_properties"]
source_name = source_props.get("name", ex["source"])
target_name = target_props.get("name", ex["target"])
chunk += f"- {source_name} {rel['name']} {target_name}\n"
context_chunks.append(chunk)
# If we found nothing specific, add general ontology info
if not context_chunks:
# Add information about top-level classes
top_classes = [c for c, data in self.ontology_data["classes"].items()
if "subClassOf" not in data or data["subClassOf"] == "Entity"]
if top_classes:
chunk = "Main classes in the ontology:\n"
for cls in top_classes:
desc = self.ontology_data["classes"][cls].get("description", "")
chunk += f"- {cls}: {desc}\n"
context_chunks.append(chunk)
# Add information about key relationships
if self.ontology_data["relationships"]:
chunk = "Key relationships in the ontology:\n"
for rel in self.ontology_data["relationships"][:5]: # Top 5 relationships
chunk += f"- {rel['name']}: {rel.get('description', '')}\n"
context_chunks.append(chunk)
return context_chunks