Spaces:
Running
Running
# src/ontology_manager.py | |
import json | |
import networkx as nx | |
from typing import Dict, List, Any, Optional, Union, Set | |
class OntologyManager: | |
""" | |
Manages the ontology model and provides methods for querying and navigating | |
the ontological structure. | |
""" | |
def __init__(self, ontology_path: str): | |
""" | |
Initialize the ontology manager with a path to the ontology JSON file. | |
Args: | |
ontology_path: Path to the JSON file containing the ontology model | |
""" | |
self.ontology_path = ontology_path | |
self.ontology_data = self._load_ontology() | |
self.graph = nx.MultiDiGraph() | |
self._build_graph() | |
def _load_ontology(self) -> Dict: | |
"""Load the ontology from the JSON file.""" | |
with open(self.ontology_path, 'r') as f: | |
return json.load(f) | |
def _build_graph(self): | |
"""Build the ontology graph from the JSON data.""" | |
# Add classes | |
for class_id, class_data in self.ontology_data["classes"].items(): | |
self.graph.add_node( | |
class_id, | |
type="class", | |
description=class_data.get("description", ""), | |
properties=class_data.get("properties", []) | |
) | |
# Handle subclass relations | |
if "subClassOf" in class_data: | |
parent = class_data["subClassOf"] | |
self.graph.add_edge(class_id, parent, type="subClassOf") | |
# Add relationships (schema-level only, no edge added yet) | |
for rel in self.ontology_data.get("relationships", []): | |
pass # schema relationships are used for metadata, not edges | |
# Add instances | |
for instance in self.ontology_data.get("instances", []): | |
instance_id = instance["id"] | |
class_type = instance["type"] | |
properties = instance.get("properties", {}) | |
# Add the instance node | |
self.graph.add_node( | |
instance_id, | |
type="instance", | |
class_type=class_type, | |
properties=properties | |
) | |
# Link instance to its class | |
self.graph.add_edge(instance_id, class_type, type="instanceOf") | |
# Add relationship edges if any | |
for rel in instance.get("relationships", []): | |
target = rel.get("target") | |
rel_type = rel.get("type") | |
if target and rel_type: | |
self.graph.add_edge(instance_id, target, type=rel_type) | |
def get_classes(self) -> List[str]: | |
"""Return a list of all class names in the ontology.""" | |
return list(self.ontology_data["classes"].keys()) | |
def get_class_hierarchy(self) -> Dict[str, List[str]]: | |
"""Return a dictionary mapping each class to its subclasses.""" | |
hierarchy = {} | |
for class_id in self.get_classes(): | |
hierarchy[class_id] = [] | |
for class_id, class_data in self.ontology_data["classes"].items(): | |
if "subClassOf" in class_data: | |
parent = class_data["subClassOf"] | |
if parent in hierarchy: | |
hierarchy[parent].append(class_id) | |
return hierarchy | |
def get_instances_of_class(self, class_name: str, include_subclasses: bool = True) -> List[str]: | |
""" | |
Get all instances of a given class. | |
Args: | |
class_name: The name of the class | |
include_subclasses: Whether to include instances of subclasses | |
Returns: | |
A list of instance IDs | |
""" | |
if include_subclasses: | |
# Get all subclasses recursively | |
subclasses = set(self._get_all_subclasses(class_name)) | |
subclasses.add(class_name) | |
# Get instances of all classes | |
instances = [] | |
for class_id in subclasses: | |
instances.extend([ | |
n for n, attr in self.graph.nodes(data=True) | |
if attr.get("type") == "instance" and attr.get("class_type") == class_id | |
]) | |
return instances | |
else: | |
# Just get direct instances | |
return [ | |
n for n, attr in self.graph.nodes(data=True) | |
if attr.get("type") == "instance" and attr.get("class_type") == class_name | |
] | |
def _get_all_subclasses(self, class_name: str) -> List[str]: | |
"""Recursively get all subclasses of a given class.""" | |
subclasses = [] | |
direct_subclasses = [ | |
src for src, dst, data in self.graph.edges(data=True) | |
if dst == class_name and data.get("type") == "subClassOf" | |
] | |
for subclass in direct_subclasses: | |
subclasses.append(subclass) | |
subclasses.extend(self._get_all_subclasses(subclass)) | |
return subclasses | |
def get_relationships(self, entity_id: str, relationship_type: Optional[str] = None) -> List[Dict]: | |
""" | |
Get all relationships for a given entity, optionally filtered by type. | |
Args: | |
entity_id: The ID of the entity | |
relationship_type: Optional relationship type to filter by | |
Returns: | |
A list of dictionaries containing relationship information | |
""" | |
relationships = [] | |
# Look at outgoing edges | |
for _, target, data in self.graph.out_edges(entity_id, data=True): | |
rel_type = data.get("type") | |
if rel_type != "instanceOf" and rel_type != "subClassOf": | |
if relationship_type is None or rel_type == relationship_type: | |
relationships.append({ | |
"type": rel_type, | |
"target": target, | |
"direction": "outgoing" | |
}) | |
# Look at incoming edges | |
for source, _, data in self.graph.in_edges(entity_id, data=True): | |
rel_type = data.get("type") | |
if rel_type != "instanceOf" and rel_type != "subClassOf": | |
if relationship_type is None or rel_type == relationship_type: | |
relationships.append({ | |
"type": rel_type, | |
"source": source, | |
"direction": "incoming" | |
}) | |
return relationships | |
def find_paths(self, source_id: str, target_id: str, max_length: int = 3) -> List[List[Dict]]: | |
""" | |
Find all paths between two entities up to a maximum length. | |
Args: | |
source_id: Starting entity ID | |
target_id: Target entity ID | |
max_length: Maximum path length | |
Returns: | |
A list of paths, where each path is a list of relationship dictionaries | |
""" | |
paths = [] | |
# Use networkx to find simple paths | |
simple_paths = nx.all_simple_paths(self.graph, source_id, target_id, cutoff=max_length) | |
for path in simple_paths: | |
path_with_edges = [] | |
for i in range(len(path) - 1): | |
source = path[i] | |
target = path[i + 1] | |
# There may be multiple edges between nodes | |
edges = self.graph.get_edge_data(source, target) | |
if edges: | |
for key, data in edges.items(): | |
path_with_edges.append({ | |
"source": source, | |
"target": target, | |
"type": data.get("type", "unknown") | |
}) | |
paths.append(path_with_edges) | |
return paths | |
def get_entity_info(self, entity_id: str) -> Dict: | |
""" | |
Get detailed information about an entity. | |
Args: | |
entity_id: The ID of the entity | |
Returns: | |
A dictionary with entity information | |
""" | |
if entity_id not in self.graph: | |
return {} | |
node_data = self.graph.nodes[entity_id] | |
entity_type = node_data.get("type") | |
if entity_type == "instance": | |
# Get class information | |
class_type = node_data.get("class_type") | |
class_info = self.ontology_data["classes"].get(class_type, {}) | |
return { | |
"id": entity_id, | |
"type": entity_type, | |
"class": class_type, | |
"class_description": class_info.get("description", ""), | |
"properties": node_data.get("properties", {}), | |
"relationships": self.get_relationships(entity_id) | |
} | |
elif entity_type == "class": | |
return { | |
"id": entity_id, | |
"type": entity_type, | |
"description": node_data.get("description", ""), | |
"properties": node_data.get("properties", []), | |
"subclasses": self._get_all_subclasses(entity_id), | |
"instances": self.get_instances_of_class(entity_id) | |
} | |
return node_data | |
def get_text_representation(self) -> str: | |
""" | |
Generate a text representation of the ontology for embedding. | |
Returns: | |
A string containing the textual representation of the ontology | |
""" | |
text_chunks = [] | |
# Class definitions | |
for class_id, class_data in self.ontology_data["classes"].items(): | |
chunk = f"Class: {class_id}\n" | |
chunk += f"Description: {class_data.get('description', '')}\n" | |
if "subClassOf" in class_data: | |
chunk += f"{class_id} is a subclass of {class_data['subClassOf']}.\n" | |
if "properties" in class_data: | |
chunk += f"{class_id} has properties: {', '.join(class_data['properties'])}.\n" | |
text_chunks.append(chunk) | |
# Relationship definitions | |
for rel in self.ontology_data["relationships"]: | |
chunk = f"Relationship: {rel['name']}\n" | |
chunk += f"Domain: {rel['domain']}, Range: {rel['range']}\n" | |
chunk += f"Description: {rel.get('description', '')}\n" | |
chunk += f"Cardinality: {rel.get('cardinality', 'many-to-many')}\n" | |
if "inverse" in rel: | |
chunk += f"The inverse relationship is {rel['inverse']}.\n" | |
text_chunks.append(chunk) | |
# Rules | |
for rule in self.ontology_data.get("rules", []): | |
chunk = f"Rule: {rule.get('id', '')}\n" | |
chunk += f"Description: {rule.get('description', '')}\n" | |
text_chunks.append(chunk) | |
# Instance data | |
for instance in self.ontology_data["instances"]: | |
chunk = f"Instance: {instance['id']}\n" | |
chunk += f"Type: {instance['type']}\n" | |
# Properties | |
if "properties" in instance: | |
props = [] | |
for key, value in instance["properties"].items(): | |
if isinstance(value, list): | |
props.append(f"{key}: {', '.join(str(v) for v in value)}") | |
else: | |
props.append(f"{key}: {value}") | |
if props: | |
chunk += "Properties:\n- " + "\n- ".join(props) + "\n" | |
# Relationships | |
if "relationships" in instance: | |
rels = [] | |
for rel in instance["relationships"]: | |
rels.append(f"{rel['type']} {rel['target']}") | |
if rels: | |
chunk += "Relationships:\n- " + "\n- ".join(rels) + "\n" | |
text_chunks.append(chunk) | |
return "\n\n".join(text_chunks) | |
def query_by_relationship(self, source_type: str, relationship: str, target_type: str) -> List[Dict]: | |
""" | |
Query for instances connected by a specific relationship. | |
Args: | |
source_type: Type of the source entity | |
relationship: Type of relationship | |
target_type: Type of the target entity | |
Returns: | |
A list of matching relationship dictionaries | |
""" | |
results = [] | |
# Get all instances of the source type | |
source_instances = self.get_instances_of_class(source_type) | |
for source_id in source_instances: | |
# Get relationships of the specified type | |
relationships = self.get_relationships(source_id, relationship) | |
for rel in relationships: | |
if rel["direction"] == "outgoing" and "target" in rel: | |
target_id = rel["target"] | |
target_data = self.graph.nodes[target_id] | |
# Check if the target is of the right type | |
if (target_data.get("type") == "instance" and | |
target_data.get("class_type") == target_type): | |
results.append({ | |
"source": source_id, | |
"source_properties": self.graph.nodes[source_id].get("properties", {}), | |
"relationship": relationship, | |
"target": target_id, | |
"target_properties": target_data.get("properties", {}) | |
}) | |
return results | |
def get_semantic_context(self, query: str) -> List[str]: | |
""" | |
Retrieve relevant semantic context from the ontology based on a query. | |
This method identifies entities and relationships mentioned in the query | |
and returns contextual information about them from the ontology. | |
Args: | |
query: The query string to analyze | |
Returns: | |
A list of text chunks providing relevant ontological context | |
""" | |
# This is a simple implementation - a more sophisticated one would use | |
# entity recognition and semantic parsing | |
query_lower = query.lower() | |
context_chunks = [] | |
# Check for class mentions | |
for class_id in self.get_classes(): | |
if class_id.lower() in query_lower: | |
# Add class information | |
class_data = self.ontology_data["classes"][class_id] | |
chunk = f"Class {class_id}: {class_data.get('description', '')}\n" | |
# Add subclass information | |
if "subClassOf" in class_data: | |
parent = class_data["subClassOf"] | |
chunk += f"{class_id} is a subclass of {parent}.\n" | |
# Add property information | |
if "properties" in class_data: | |
chunk += f"{class_id} has properties: {', '.join(class_data['properties'])}.\n" | |
context_chunks.append(chunk) | |
# Also add some instance examples | |
instances = self.get_instances_of_class(class_id, include_subclasses=False)[:3] | |
if instances: | |
instance_chunk = f"Examples of {class_id}:\n" | |
for inst_id in instances: | |
props = self.graph.nodes[inst_id].get("properties", {}) | |
if "name" in props: | |
instance_chunk += f"- {inst_id} ({props['name']})\n" | |
else: | |
instance_chunk += f"- {inst_id}\n" | |
context_chunks.append(instance_chunk) | |
# Check for relationship mentions | |
for rel in self.ontology_data["relationships"]: | |
if rel["name"].lower() in query_lower: | |
chunk = f"Relationship {rel['name']}: {rel.get('description', '')}\n" | |
chunk += f"This relationship connects {rel['domain']} to {rel['range']}.\n" | |
# Add examples | |
examples = self.query_by_relationship(rel['domain'], rel['name'], rel['range'])[:3] | |
if examples: | |
chunk += "Examples:\n" | |
for ex in examples: | |
source_props = ex["source_properties"] | |
target_props = ex["target_properties"] | |
source_name = source_props.get("name", ex["source"]) | |
target_name = target_props.get("name", ex["target"]) | |
chunk += f"- {source_name} {rel['name']} {target_name}\n" | |
context_chunks.append(chunk) | |
# If we found nothing specific, add general ontology info | |
if not context_chunks: | |
# Add information about top-level classes | |
top_classes = [c for c, data in self.ontology_data["classes"].items() | |
if "subClassOf" not in data or data["subClassOf"] == "Entity"] | |
if top_classes: | |
chunk = "Main classes in the ontology:\n" | |
for cls in top_classes: | |
desc = self.ontology_data["classes"][cls].get("description", "") | |
chunk += f"- {cls}: {desc}\n" | |
context_chunks.append(chunk) | |
# Add information about key relationships | |
if self.ontology_data["relationships"]: | |
chunk = "Key relationships in the ontology:\n" | |
for rel in self.ontology_data["relationships"][:5]: # Top 5 relationships | |
chunk += f"- {rel['name']}: {rel.get('description', '')}\n" | |
context_chunks.append(chunk) | |
return context_chunks |