Spaces:
Running
Running
# Add this at the top of visualization.py with other imports | |
import re | |
import streamlit as st | |
import json | |
import networkx as nx | |
import pandas as pd | |
from typing import Dict, List, Any, Optional, Set, Tuple | |
import plotly.graph_objects as go | |
import plotly.express as px | |
import matplotlib.pyplot as plt | |
import matplotlib.colors as mcolors | |
from collections import defaultdict | |
import math | |
def analyze_query_ontology_concepts(query: str, ontology_manager) -> Tuple[List[Dict], List[Dict]]: | |
""" | |
Analyze the query to identify ontology concepts with confidence scores. | |
This is a simplified implementation that would be replaced with NLP in production. | |
""" | |
# For debugging - print the query | |
st.write(f"Debug - Analyzing query: '{query}'") | |
query_lower = query.lower() | |
# Entity detection | |
entity_mentions = [] | |
classes = ontology_manager.get_classes() | |
# Debug - print available classes | |
st.write(f"Debug - Available classes: {classes[:5]}...") | |
for class_name in classes: | |
# Check for both exact word match and partial match | |
word_match = False | |
partial_match = False | |
# Word boundary regex for exact word match | |
pattern = r'\b' + re.escape(class_name.lower()) + r'\b' | |
word_match = bool(re.search(pattern, query_lower)) | |
# Check for partial match in case class has multiple words or special spelling | |
partial_match = class_name.lower() in query_lower | |
if word_match or partial_match: | |
# Get class info | |
class_info = ontology_manager.ontology_data["classes"].get(class_name, {}) | |
# Confidence is higher for word match than partial match | |
base_confidence = 0.9 if word_match else 0.7 | |
length_factor = min(0.05, (len(class_name) / 400)) # Adjust for length but don't penalize too much | |
confidence = min(0.95, base_confidence + length_factor) | |
entity_mentions.append({ | |
"type": class_name, | |
"confidence": confidence, | |
"description": class_info.get("description", ""), | |
"match_type": "word" if word_match else "partial" | |
}) | |
# Debug - show what was found | |
st.write(f"Debug - Found {len(entity_mentions)} entity mentions") | |
# Relationship detection | |
relationship_mentions = [] | |
relationships = ontology_manager.ontology_data.get("relationships", []) | |
for rel in relationships: | |
rel_name = rel["name"] | |
# Try both word boundary and partial match | |
word_match = False | |
partial_match = False | |
pattern = r'\b' + re.escape(rel_name.lower()) + r'\b' | |
word_match = bool(re.search(pattern, query_lower)) | |
# For relationships like "ownedBy", check if it appears as part of words too | |
partial_match = rel_name.lower() in query_lower | |
if word_match or partial_match: | |
# Higher confidence for word match | |
base_confidence = 0.85 if word_match else 0.65 | |
length_factor = min(0.05, (len(rel_name) / 400)) | |
confidence = min(0.9, base_confidence + length_factor) | |
relationship_mentions.append({ | |
"name": rel_name, | |
"domain": rel["domain"], | |
"range": rel["range"], | |
"confidence": confidence, | |
"description": rel.get("description", ""), | |
"match_type": "word" if word_match else "partial" | |
}) | |
# Debug - show what was found | |
st.write(f"Debug - Found {len(relationship_mentions)} relationship mentions") | |
# Add hardcoded examples if nothing detected (fallback for debugging) | |
if not entity_mentions and not relationship_mentions: | |
st.write("Debug - No matches found, adding fallback examples") | |
# Add some fallback examples to ensure the UI displays something | |
entity_mentions.append({ | |
"type": "Customer", | |
"confidence": 0.8, | |
"description": "A person or organization that purchases products or services", | |
"match_type": "fallback" | |
}) | |
entity_mentions.append({ | |
"type": "Product", | |
"confidence": 0.75, | |
"description": "An item offered for sale or use", | |
"match_type": "fallback" | |
}) | |
relationship_mentions.append({ | |
"name": "provides", | |
"domain": "Customer", | |
"range": "Feedback", | |
"confidence": 0.7, | |
"description": "Connects customers to their feedback submissions", | |
"match_type": "fallback" | |
}) | |
# Clear debug messages before returning | |
# st.empty() | |
return entity_mentions, relationship_mentions | |
# The rest of your visualization.py file, including the fixed display_reasoning_trace function | |
def display_reasoning_trace(query: str, retrieved_docs: List[Dict], answer: str, ontology_manager): | |
"""Display an enhanced trace of how ontological reasoning was used to answer the query.""" | |
st.subheader("🧠 Ontology-Enhanced Reasoning") | |
# Create a multi-tab interface for different aspects of reasoning | |
tab1, tab2, tab3 = st.tabs(["Query Analysis", "Knowledge Retrieval", "Reasoning Path"]) | |
with tab1: | |
# Extract entity and relationship mentions with confidence | |
entity_mentions, relationship_mentions = analyze_query_ontology_concepts(query, ontology_manager) | |
# Display detected entities with confidence scores | |
if entity_mentions: | |
st.markdown("### Entities Detected in Query") | |
# Convert to DataFrame for visualization | |
entity_df = pd.DataFrame([{ | |
"Entity Type": e["type"], | |
"Confidence": e["confidence"], | |
"Description": e["description"] | |
} for e in entity_mentions]) | |
# Sort by confidence | |
entity_df = entity_df.sort_values("Confidence", ascending=False) | |
# Create a horizontal bar chart | |
fig = px.bar(entity_df, | |
x="Confidence", | |
y="Entity Type", | |
orientation='h', | |
title="Entity Type Detection Confidence", | |
color="Confidence", | |
color_continuous_scale="Blues", | |
text="Confidence") | |
fig.update_traces(texttemplate='%{text:.0%}', textposition='outside') | |
fig.update_layout(xaxis_tickformat=".0%") | |
st.plotly_chart(fig, use_container_width=True) | |
# Display descriptions | |
st.subheader("Entity Type Descriptions") | |
st.dataframe( | |
entity_df[["Entity Type", "Description"]], | |
hide_index=True | |
) | |
# Display detected relationships | |
if relationship_mentions: | |
st.markdown("### Relationships Detected in Query") | |
# Convert to DataFrame | |
rel_df = pd.DataFrame([{ | |
"Relationship": r["name"], | |
"From": r["domain"], | |
"To": r["range"], | |
"Confidence": r["confidence"], | |
"Description": r["description"] | |
} for r in relationship_mentions]) | |
# Sort by confidence | |
rel_df = rel_df.sort_values("Confidence", ascending=False) | |
# Create visualization | |
fig = px.bar(rel_df, | |
x="Confidence", | |
y="Relationship", | |
orientation='h', | |
title="Relationship Detection Confidence", | |
color="Confidence", | |
color_continuous_scale="Reds", | |
text="Confidence") | |
fig.update_traces(texttemplate='%{text:.0%}', textposition='outside') | |
fig.update_layout(xaxis_tickformat=".0%") | |
st.plotly_chart(fig, use_container_width=True) | |
# Display relationship details | |
st.subheader("Relationship Details") | |
st.dataframe( | |
rel_df[["Relationship", "From", "To", "Description"]], | |
hide_index=True | |
) | |
with tab2: | |
# Create an enhanced visualization of the retrieval process | |
st.markdown("### Knowledge Retrieval Process") | |
# Group retrieved documents by source | |
docs_by_source = defaultdict(list) | |
for doc in retrieved_docs: | |
if hasattr(doc, 'metadata'): | |
source = doc.metadata.get('source', 'unknown') | |
docs_by_source[source].append(doc) | |
else: | |
docs_by_source['unknown'].append(doc) | |
# Display retrieval visualization | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
# Create a Sankey diagram to show flow from query to sources to answer | |
display_retrieval_flow(query, docs_by_source) | |
with col2: | |
# Display source distribution | |
source_counts = {source: len(docs) for source, docs in docs_by_source.items()} | |
# Create a pie chart | |
fig = px.pie( | |
values=list(source_counts.values()), | |
names=list(source_counts.keys()), | |
title="Retrieved Context Sources", | |
color_discrete_sequence=px.colors.qualitative.Plotly | |
) | |
st.plotly_chart(fig, use_container_width=True) | |
# Display retrieved document details in expandable sections | |
for source, docs in docs_by_source.items(): | |
st.subheader(f"{source} ({len(docs)})") | |
for i, doc in enumerate(docs): | |
# Add separator between documents | |
if i > 0: | |
st.markdown("---") | |
# Display document content | |
if hasattr(doc, 'page_content'): | |
st.markdown(f"**Content:**") | |
# Format depending on source | |
if source in ["ontology", "ontology_context"]: | |
st.markdown(doc.page_content) | |
else: | |
st.code(doc.page_content) | |
# Display metadata if present | |
if hasattr(doc, 'metadata') and doc.metadata: | |
st.markdown("**Metadata:**") | |
for key, value in doc.metadata.items(): | |
if key != 'source': # Already shown in section title | |
st.markdown(f"- **{key}**: {value}") | |
with tab3: | |
# Show the reasoning flow from query to answer | |
st.markdown("### Ontological Reasoning Process") | |
# Display reasoning steps | |
reasoning_steps = generate_reasoning_steps(query, entity_mentions, relationship_mentions, retrieved_docs, answer) | |
for i, step in enumerate(reasoning_steps): | |
with st.expander(f"Step {i+1}: {step['title']}", expanded=i == 0): | |
st.markdown(step["description"]) | |
# Visualization of how ontological structure influenced the answer | |
st.markdown("### How Ontology Enhanced the Answer") | |
# Display ontology advantage explanation | |
advantages = explain_ontology_advantages(entity_mentions, relationship_mentions) | |
for adv in advantages: | |
st.markdown(f"**{adv['title']}**") | |
st.markdown(adv["description"]) | |
# The rest of the code remains the same as in your original visualization.py file | |
def display_retrieval_flow(query: str, docs_by_source: Dict[str, List]): | |
"""Create a Sankey diagram showing the flow from query to sources to answer.""" | |
# Define node labels | |
nodes = ["Query"] | |
# Add source nodes | |
for source in docs_by_source.keys(): | |
nodes.append(f"Source: {source.capitalize()}") | |
nodes.append("Answer") | |
# Define links | |
source_indices = [] | |
target_indices = [] | |
values = [] | |
# Links from query to sources | |
for i, (source, docs) in enumerate(docs_by_source.items()): | |
source_indices.append(0) # Query is index 0 | |
target_indices.append(i + 1) # Source indices start at 1 | |
values.append(len(docs)) # Width based on number of docs | |
# Links from sources to answer | |
for i in range(len(docs_by_source)): | |
source_indices.append(i + 1) # Source index | |
target_indices.append(len(nodes) - 1) # Answer is last node | |
values.append(values[i]) # Same width as query to source | |
# Create Sankey diagram | |
fig = go.Figure(data=[go.Sankey( | |
node=dict( | |
pad=15, | |
thickness=20, | |
line=dict(color="black", width=0.5), | |
label=nodes, | |
color=["#1f77b4"] + [px.colors.qualitative.Plotly[i % len(px.colors.qualitative.Plotly)] | |
for i in range(len(docs_by_source))] + ["#2ca02c"] | |
), | |
link=dict( | |
source=source_indices, | |
target=target_indices, | |
value=values | |
) | |
)]) | |
fig.update_layout( | |
title="Information Flow in RAG Process", | |
font=dict(size=12) | |
) | |
st.plotly_chart(fig, use_container_width=True) | |
def generate_reasoning_steps(query: str, entity_mentions: List[Dict], relationship_mentions: List[Dict], | |
retrieved_docs: List[Dict], answer: str) -> List[Dict]: | |
"""Generate reasoning steps to explain how the system arrived at the answer.""" | |
steps = [] | |
# Step 1: Query Understanding | |
steps.append({ | |
"title": "Query Understanding", | |
"description": f"""The system analyzes the query "{query}" and identifies key concepts from the ontology. | |
{len(entity_mentions)} entity types and {len(relationship_mentions)} relationship types are recognized, allowing | |
the system to understand the semantic context of the question.""" | |
}) | |
# Step 2: Knowledge Retrieval | |
if retrieved_docs: | |
doc_count = len(retrieved_docs) | |
ontology_count = sum(1 for doc in retrieved_docs if hasattr(doc, 'metadata') and | |
doc.metadata.get('source', '') in ['ontology', 'ontology_context']) | |
steps.append({ | |
"title": "Knowledge Retrieval", | |
"description": f"""Based on the identified concepts, the system retrieves {doc_count} relevant pieces of information, | |
including {ontology_count} from the structured ontology. This hybrid approach combines traditional vector retrieval | |
with ontology-aware semantic retrieval, enabling access to both explicit and implicit knowledge.""" | |
}) | |
# Step 3: Relationship Traversal | |
if relationship_mentions: | |
rel_names = [r["name"] for r in relationship_mentions] | |
steps.append({ | |
"title": "Relationship Traversal", | |
"description": f"""The system identifies key relationships in the ontology: {', '.join(rel_names)}. | |
By traversing these relationships, the system can connect concepts that might not appear together in the same text, | |
allowing for multi-hop reasoning across the knowledge graph.""" | |
}) | |
# Step 4: Ontological Inference | |
if entity_mentions: | |
entity_types = [e["type"] for e in entity_mentions] | |
steps.append({ | |
"title": "Ontological Inference", | |
"description": f"""Using the hierarchical structure of entities like {', '.join(entity_types)}, | |
the system makes inferences based on class inheritance and relationship constraints defined in the ontology. | |
This allows it to reason about properties and relationships that might not be explicitly stated.""" | |
}) | |
# Step 5: Answer Generation | |
steps.append({ | |
"title": "Answer Synthesis", | |
"description": f"""Finally, the system synthesizes the retrieved information and ontological knowledge to generate a comprehensive answer. | |
The structured nature of the ontology ensures that the answer accurately reflects the relationships between concepts | |
and respects the business rules defined in the knowledge model.""" | |
}) | |
return steps | |
def explain_ontology_advantages(entity_mentions: List[Dict], relationship_mentions: List[Dict]) -> List[Dict]: | |
"""Explain how ontology enhanced the RAG process.""" | |
advantages = [] | |
if entity_mentions: | |
advantages.append({ | |
"title": "Hierarchical Knowledge Representation", | |
"description": """The ontology provides a hierarchical class structure that enables the system to understand | |
that concepts are related through is-a relationships. For instance, knowing that a Manager is an Employee | |
allows the system to apply Employee-related knowledge when answering questions about Managers, even if | |
the specific information was only stated for Employees in general.""" | |
}) | |
if relationship_mentions: | |
advantages.append({ | |
"title": "Explicit Relationship Semantics", | |
"description": """The ontology defines explicit relationships between concepts with clear semantics. | |
This allows the system to understand how entities are connected beyond simple co-occurrence in text. | |
For example, understanding that 'ownedBy' connects Products to Departments helps answer questions | |
about product ownership and departmental responsibilities.""" | |
}) | |
advantages.append({ | |
"title": "Constraint-Based Reasoning", | |
"description": """Business rules in the ontology provide constraints that guide the reasoning process. | |
These rules ensure the system's answers are consistent with the organization's policies and practices. | |
For instance, rules about approval workflows or data classification requirements can inform answers | |
about process-related questions.""" | |
}) | |
advantages.append({ | |
"title": "Cross-Domain Knowledge Integration", | |
"description": """The ontology connects concepts across different domains of the enterprise, enabling | |
integrated reasoning that traditional document-based retrieval might miss. This allows the system to | |
answer questions that span organizational boundaries, such as how marketing decisions affect product | |
development or how customer feedback influences business strategy.""" | |
}) | |
return advantages | |
def render_html_in_streamlit(html_content: str): | |
"""Display HTML content in Streamlit using components.html.""" | |
import streamlit.components.v1 as components | |
# Directly render the HTML using components.html | |
components.html(html_content, height=600, scrolling=True) | |
def display_ontology_stats(ontology_manager): | |
"""Display statistics and visualizations about the ontology.""" | |
st.subheader("📊 Ontology Structure and Statistics") | |
# Get basic stats | |
classes = ontology_manager.get_classes() | |
class_hierarchy = ontology_manager.get_class_hierarchy() | |
# Count instances per class | |
class_counts = [] | |
for class_name in classes: | |
instance_count = len(ontology_manager.get_instances_of_class(class_name, include_subclasses=False)) | |
class_counts.append({ | |
"Class": class_name, | |
"Instances": instance_count | |
}) | |
# Display summary metrics | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
st.metric("Total Classes", len(classes)) | |
# Count total instances | |
total_instances = sum(item["Instances"] for item in class_counts) | |
with col2: | |
st.metric("Total Instances", total_instances) | |
# Count relationships | |
relationship_count = len(ontology_manager.ontology_data.get("relationships", [])) | |
with col3: | |
st.metric("Relationship Types", relationship_count) | |
# Visualize class hierarchy | |
st.markdown("### Class Hierarchy") | |
# Create tabs for different views | |
tab1, tab2, tab3 = st.tabs(["Tree View", "Class Statistics", "Hierarchy Graph"]) | |
with tab1: | |
# Create a collapsible tree view of class hierarchy | |
display_class_hierarchy_tree(ontology_manager, class_hierarchy) | |
with tab2: | |
# Display class stats and distribution | |
if class_counts: | |
# Filter to only show classes with instances | |
non_empty_classes = [item for item in class_counts if item["Instances"] > 0] | |
if non_empty_classes: | |
df = pd.DataFrame(non_empty_classes) | |
df = df.sort_values("Instances", ascending=False) | |
# Create horizontal bar chart | |
fig = px.bar(df, | |
x="Instances", | |
y="Class", | |
orientation='h', | |
title="Instances per Class", | |
color="Instances", | |
color_continuous_scale="viridis") | |
fig.update_layout(yaxis={'categoryorder':'total ascending'}) | |
st.plotly_chart(fig, use_container_width=True) | |
else: | |
st.info("No classes with instances found.") | |
# Show distribution of classes by inheritance depth | |
display_class_depth_distribution(ontology_manager) | |
with tab3: | |
# Display class hierarchy as a graph | |
display_class_hierarchy_graph(ontology_manager) | |
# Relationship statistics | |
st.markdown("### Relationship Analysis") | |
# Get relationship usage statistics | |
relationship_usage = analyze_relationship_usage(ontology_manager) | |
# Display relationship usage in a table and chart | |
if relationship_usage: | |
tab1, tab2 = st.tabs(["Usage Statistics", "Domain/Range Distribution"]) | |
with tab1: | |
# Create DataFrame for the table | |
df = pd.DataFrame(relationship_usage) | |
df = df.sort_values("Usage Count", ascending=False) | |
# Show table | |
st.dataframe(df) | |
# Create bar chart for relationship usage | |
fig = px.bar(df, | |
x="Relationship", | |
y="Usage Count", | |
title="Relationship Usage Frequency", | |
color="Usage Count", | |
color_continuous_scale="blues") | |
st.plotly_chart(fig, use_container_width=True) | |
with tab2: | |
# Display domain-range distribution | |
display_domain_range_distribution(ontology_manager) | |
def display_class_hierarchy_tree(ontology_manager, class_hierarchy): | |
"""Display class hierarchy using expanders at root level, and plain list for subclasses.""" | |
# Find all subcategory sets | |
all_subclasses = set() | |
for subclasses in class_hierarchy.values(): | |
all_subclasses.update(subclasses) | |
# Root classes are those that are not subclassed by any class | |
root_classes = [cls for cls in ontology_manager.get_classes() if cls not in all_subclasses] | |
for root_class in sorted(root_classes): | |
class_info = ontology_manager.ontology_data["classes"].get(root_class, {}) | |
description = class_info.get("description", "") | |
instance_count = len(ontology_manager.get_instances_of_class(root_class, include_subclasses=False)) | |
properties = class_info.get("properties", []) | |
with st.expander(f"📁 {root_class} ({instance_count} instances)", expanded=True): | |
st.markdown(f"**Description:** {description}") | |
if properties: | |
st.markdown("**Properties:**") | |
st.markdown(", ".join(properties)) | |
subclasses = class_hierarchy.get(root_class, []) | |
if subclasses: | |
st.markdown("**Subclasses:**") | |
for subclass in sorted(subclasses): | |
display_subclass_plain(subclass, class_hierarchy, ontology_manager, indent=1) | |
else: | |
st.markdown("*No subclasses*") | |
def display_subclass_plain(class_name, class_hierarchy, ontology_manager, indent=1): | |
"""Use indentation to render subclass info without nested expanders.""" | |
indent_str = " " * indent | |
class_info = ontology_manager.ontology_data["classes"].get(class_name, {}) | |
description = class_info.get("description", "") | |
properties = class_info.get("properties", []) | |
instance_count = len(ontology_manager.get_instances_of_class(class_name, include_subclasses=False)) | |
st.markdown(f"{indent_str}🔹 **{class_name}** ({instance_count} instances)") | |
st.markdown(f"{indent_str}> {description}") | |
if properties: | |
st.markdown(f"{indent_str}*Properties:* {', '.join(properties)}") | |
subclasses = class_hierarchy.get(class_name, []) | |
for subclass in sorted(subclasses): | |
display_subclass_plain(subclass, class_hierarchy, ontology_manager, indent + 1) | |
def get_class_depths(ontology_manager) -> Dict[str, int]: | |
"""Calculate the inheritance depth of each class.""" | |
depths = {} | |
class_data = ontology_manager.ontology_data["classes"] | |
def get_depth(class_name): | |
# If we've already calculated the depth, return it | |
if class_name in depths: | |
return depths[class_name] | |
# Get the class data | |
cls = class_data.get(class_name, {}) | |
# If no parent, depth is 0 | |
if "subClassOf" not in cls: | |
depths[class_name] = 0 | |
return 0 | |
# Otherwise, depth is 1 + parent's depth | |
parent = cls["subClassOf"] | |
parent_depth = get_depth(parent) | |
depths[class_name] = parent_depth + 1 | |
return depths[class_name] | |
# Calculate depths for all classes | |
for class_name in class_data: | |
get_depth(class_name) | |
return depths | |
def display_class_depth_distribution(ontology_manager): | |
"""Display distribution of classes by inheritance depth.""" | |
depths = get_class_depths(ontology_manager) | |
# Count classes at each depth | |
depth_counts = defaultdict(int) | |
for _, depth in depths.items(): | |
depth_counts[depth] += 1 | |
# Create dataframe | |
df = pd.DataFrame([ | |
{"Depth": depth, "Count": count} | |
for depth, count in depth_counts.items() | |
]) | |
if not df.empty: | |
df = df.sort_values("Depth") | |
# Create bar chart | |
fig = px.bar(df, | |
x="Depth", | |
y="Count", | |
title="Class Distribution by Inheritance Depth", | |
labels={"Depth": "Inheritance Depth", "Count": "Number of Classes"}, | |
color="Count", | |
text="Count") | |
fig.update_traces(texttemplate='%{text}', textposition='outside') | |
fig.update_layout(uniformtext_minsize=8, uniformtext_mode='hide') | |
st.plotly_chart(fig, use_container_width=True) | |
def display_class_hierarchy_graph(ontology_manager): | |
"""Display class hierarchy as a directed graph.""" | |
# Create a directed graph | |
G = nx.DiGraph() | |
# Add nodes for each class | |
for class_name, class_info in ontology_manager.ontology_data["classes"].items(): | |
# Count direct instances | |
instance_count = len(ontology_manager.get_instances_of_class(class_name, include_subclasses=False)) | |
# Add node with attributes | |
G.add_node(class_name, | |
type="class", | |
description=class_info.get("description", ""), | |
instance_count=instance_count) | |
# Add edge for subclass relationship | |
if "subClassOf" in class_info: | |
parent = class_info["subClassOf"] | |
G.add_edge(parent, class_name, relationship="subClassOf") | |
# Create a Plotly graph visualization | |
# Calculate node positions using a hierarchical layout without pygraphviz | |
# Find root nodes | |
roots = [n for n, d in G.in_degree() if d == 0] | |
# Use kamada_kawai_layout or spring_layout as alternative | |
if len(G) > 1: | |
try: | |
# Try to make a hierarchical-like layout using springs | |
pos = nx.spring_layout(G, iterations=50, seed=42) | |
# Adjust y-coordinates to create a more hierarchical appearance | |
# First get the topological generations | |
generations = list(nx.topological_generations(G)) | |
# Assign y-coordinate based on generation | |
for i, gen in enumerate(generations): | |
y_pos = 1.0 - (i / max(1, len(generations) - 1)) | |
for node in gen: | |
if node in pos: | |
pos[node] = (pos[node][0], y_pos) | |
except Exception: | |
# Fallback to simple spring layout if there's an issue | |
pos = nx.spring_layout(G, seed=42) | |
else: | |
# For a single node | |
pos = {list(G.nodes())[0]: (0.5, 0.5)} if G.nodes() else {} | |
# Convert positions to lists for Plotly | |
node_x = [] | |
node_y = [] | |
node_text = [] | |
node_size = [] | |
node_color = [] | |
for node in G.nodes(): | |
x, y = pos[node] | |
node_x.append(x) | |
node_y.append(y) | |
# Get node info for hover text | |
description = G.nodes[node].get("description", "") | |
instance_count = G.nodes[node].get("instance_count", 0) | |
# Prepare hover text | |
hover_text = f"Class: {node}<br>Description: {description}<br>Instances: {instance_count}" | |
node_text.append(hover_text) | |
# Size nodes by instance count (with a minimum size) | |
size = 10 + (instance_count * 2) | |
size = min(40, max(15, size)) # Limit size range | |
node_size.append(size) | |
# Color nodes by depth | |
depth = get_class_depths(ontology_manager).get(node, 0) | |
# Use a color scale from light to dark blue | |
node_color.append(depth) | |
# Create edge traces | |
edge_x = [] | |
edge_y = [] | |
for edge in G.edges(): | |
x0, y0 = pos[edge[0]] | |
x1, y1 = pos[edge[1]] | |
# Add a curved line with multiple points | |
edge_x.append(x0) | |
edge_x.append(x1) | |
edge_x.append(None) # Add None to create a break between edges | |
edge_y.append(y0) | |
edge_y.append(y1) | |
edge_y.append(None) | |
# Create node trace | |
node_trace = go.Scatter( | |
x=node_x, y=node_y, | |
mode='markers+text', | |
text=[node for node in G.nodes()], | |
textposition="bottom center", | |
hoverinfo='text', | |
hovertext=node_text, | |
marker=dict( | |
showscale=True, | |
colorscale='Blues', | |
color=node_color, | |
size=node_size, | |
line=dict(width=2, color='DarkSlateGrey'), | |
colorbar=dict( | |
title="Depth", | |
thickness=15, | |
tickvals=[0, max(node_color)], | |
ticktext=["Root", f"Depth {max(node_color)}"] | |
) | |
) | |
) | |
# Create edge trace | |
edge_trace = go.Scatter( | |
x=edge_x, y=edge_y, | |
line=dict(width=1, color='#888'), | |
hoverinfo='none', | |
mode='lines' | |
) | |
# Create figure | |
fig = go.Figure(data=[edge_trace, node_trace], | |
layout=go.Layout( | |
showlegend=False, | |
hovermode='closest', | |
margin=dict(b=20, l=5, r=5, t=40), | |
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
title="Class Hierarchy Graph", | |
title_x=0.5 | |
)) | |
# Display the figure | |
st.plotly_chart(fig, use_container_width=True) | |
def analyze_relationship_usage(ontology_manager) -> List[Dict]: | |
"""Analyze how relationships are used in the ontology.""" | |
relationship_data = ontology_manager.ontology_data.get("relationships", []) | |
instances = ontology_manager.ontology_data.get("instances", []) | |
# Initialize counters | |
usage_counts = defaultdict(int) | |
# Count relationship usage in instances | |
for instance in instances: | |
for rel in instance.get("relationships", []): | |
usage_counts[rel["type"]] += 1 | |
# Prepare results | |
results = [] | |
for rel in relationship_data: | |
rel_name = rel["name"] | |
domain = rel["domain"] | |
range_class = rel["range"] | |
cardinality = rel.get("cardinality", "many-to-many") | |
count = usage_counts.get(rel_name, 0) | |
results.append({ | |
"Relationship": rel_name, | |
"Domain": domain, | |
"Range": range_class, | |
"Cardinality": cardinality, | |
"Usage Count": count | |
}) | |
return results | |
def display_domain_range_distribution(ontology_manager): | |
"""Display domain and range distribution for relationships.""" | |
relationship_data = ontology_manager.ontology_data.get("relationships", []) | |
# Count domains and ranges | |
domain_counts = defaultdict(int) | |
range_counts = defaultdict(int) | |
for rel in relationship_data: | |
domain_counts[rel["domain"]] += 1 | |
range_counts[rel["range"]] += 1 | |
# Create DataFrames | |
domain_df = pd.DataFrame([ | |
{"Class": cls, "Count": count, "Type": "Domain"} | |
for cls, count in domain_counts.items() | |
]) | |
range_df = pd.DataFrame([ | |
{"Class": cls, "Count": count, "Type": "Range"} | |
for cls, count in range_counts.items() | |
]) | |
# Combine | |
combined_df = pd.concat([domain_df, range_df]) | |
# Create plot | |
if not combined_df.empty: | |
fig = px.bar(combined_df, | |
x="Class", | |
y="Count", | |
color="Type", | |
barmode="group", | |
title="Classes as Domain vs Range in Relationships", | |
color_discrete_map={"Domain": "#1f77b4", "Range": "#ff7f0e"}) | |
fig.update_layout(xaxis={'categoryorder':'total descending'}) | |
st.plotly_chart(fig, use_container_width=True) | |
def display_entity_details(entity_info: Dict[str, Any], ontology_manager): | |
"""Display detailed information about an entity.""" | |
if not entity_info: | |
st.warning("Entity not found.") | |
return | |
st.subheader(f"📝 Entity: {entity_info['id']}") | |
# Determine entity type and get class hierarchy | |
entity_type = entity_info.get("type", "") | |
class_type = entity_info.get("class", entity_info.get("class_type", "")) | |
class_hierarchy = [] | |
if class_type: | |
current_class = class_type | |
while current_class: | |
class_hierarchy.append(current_class) | |
parent_class = ontology_manager.ontology_data["classes"].get(current_class, {}).get("subClassOf", "") | |
if not parent_class or parent_class == current_class: # Prevent infinite loops | |
break | |
current_class = parent_class | |
# Display entity metadata | |
col1, col2 = st.columns([1, 2]) | |
with col1: | |
st.markdown("### Basic Information") | |
# Basic info metrics | |
st.metric("Entity Type", entity_type) | |
if class_type: | |
st.metric("Class", class_type) | |
# Display class hierarchy | |
if class_hierarchy and len(class_hierarchy) > 1: | |
st.markdown("**Class Hierarchy:**") | |
hierarchy_str = " → ".join(reversed(class_hierarchy)) | |
st.markdown(f"```\n{hierarchy_str}\n```") | |
with col2: | |
# Display class description if available | |
if "class_description" in entity_info: | |
st.markdown("### Description") | |
st.markdown(entity_info.get("class_description", "No description available.")) | |
# Properties | |
if "properties" in entity_info and entity_info["properties"]: | |
st.markdown("### Properties") | |
# Create a more structured property display | |
properties = [] | |
for key, value in entity_info["properties"].items(): | |
# Handle different value types | |
if isinstance(value, list): | |
value_str = ", ".join(str(v) for v in value) | |
else: | |
value_str = str(value) | |
properties.append({"Property": key, "Value": value_str}) | |
# Display as table with highlighting | |
property_df = pd.DataFrame(properties) | |
st.dataframe( | |
property_df, | |
column_config={ | |
"Property": st.column_config.TextColumn("Property", width="medium"), | |
"Value": st.column_config.TextColumn("Value", width="large") | |
}, | |
hide_index=True | |
) | |
# Relationships with visual enhancements | |
if "relationships" in entity_info and entity_info["relationships"]: | |
st.markdown("### Relationships") | |
# Group relationships by direction | |
outgoing = [] | |
incoming = [] | |
for rel in entity_info["relationships"]: | |
if "direction" in rel and rel["direction"] == "outgoing": | |
outgoing.append({ | |
"Relationship": rel["type"], | |
"Direction": "→", | |
"Related Entity": rel["target"] | |
}) | |
elif "direction" in rel and rel["direction"] == "incoming": | |
incoming.append({ | |
"Relationship": rel["type"], | |
"Direction": "←", | |
"Related Entity": rel["source"] | |
}) | |
# Create tabs for outgoing and incoming | |
if outgoing or incoming: | |
tab1, tab2 = st.tabs(["Outgoing Relationships", "Incoming Relationships"]) | |
with tab1: | |
if outgoing: | |
st.dataframe( | |
pd.DataFrame(outgoing), | |
column_config={ | |
"Relationship": st.column_config.TextColumn("Relationship Type", width="medium"), | |
"Direction": st.column_config.TextColumn("Direction", width="small"), | |
"Related Entity": st.column_config.TextColumn("Target Entity", width="medium") | |
}, | |
hide_index=True | |
) | |
else: | |
st.info("No outgoing relationships.") | |
with tab2: | |
if incoming: | |
st.dataframe( | |
pd.DataFrame(incoming), | |
column_config={ | |
"Relationship": st.column_config.TextColumn("Relationship Type", width="medium"), | |
"Direction": st.column_config.TextColumn("Direction", width="small"), | |
"Related Entity": st.column_config.TextColumn("Source Entity", width="medium") | |
}, | |
hide_index=True | |
) | |
else: | |
st.info("No incoming relationships.") | |
# Visual relationship graph | |
st.markdown("#### Relationship Graph") | |
display_entity_relationship_graph(entity_info, ontology_manager) | |
def display_entity_relationship_graph(entity_info: Dict[str, Any], ontology_manager): | |
"""Display a graph of an entity's relationships.""" | |
entity_id = entity_info["id"] | |
# Create graph | |
G = nx.DiGraph() | |
# Add central entity | |
G.add_node(entity_id, type="central") | |
# Add related entities and relationships | |
for rel in entity_info.get("relationships", []): | |
if "direction" in rel and rel["direction"] == "outgoing": | |
target = rel["target"] | |
rel_type = rel["type"] | |
# Add target node if not exists | |
if target not in G: | |
target_info = ontology_manager.get_entity_info(target) | |
node_type = target_info.get("type", "unknown") | |
G.add_node(target, type=node_type) | |
# Add edge | |
G.add_edge(entity_id, target, type=rel_type) | |
elif "direction" in rel and rel["direction"] == "incoming": | |
source = rel["source"] | |
rel_type = rel["type"] | |
# Add source node if not exists | |
if source not in G: | |
source_info = ontology_manager.get_entity_info(source) | |
node_type = source_info.get("type", "unknown") | |
G.add_node(source, type=node_type) | |
# Add edge | |
G.add_edge(source, entity_id, type=rel_type) | |
# Use a force-directed layout | |
pos = nx.spring_layout(G, k=0.5, iterations=50) | |
# Create Plotly figure | |
fig = go.Figure() | |
# Add edges with curved lines | |
for source, target, data in G.edges(data=True): | |
x0, y0 = pos[source] | |
x1, y1 = pos[target] | |
rel_type = data.get("type", "unknown") | |
# Calculate edge midpoint for label | |
mid_x = (x0 + x1) / 2 | |
mid_y = (y0 + y1) / 2 | |
# Draw edge | |
fig.add_trace(go.Scatter( | |
x=[x0, x1], | |
y=[y0, y1], | |
mode="lines", | |
line=dict(width=1, color="#888"), | |
hoverinfo="text", | |
hovertext=f"Relationship: {rel_type}", | |
showlegend=False | |
)) | |
# Add relationship label | |
fig.add_trace(go.Scatter( | |
x=[mid_x], | |
y=[mid_y], | |
mode="text", | |
text=[rel_type], | |
textposition="middle center", | |
textfont=dict(size=10, color="#555"), | |
hoverinfo="none", | |
showlegend=False | |
)) | |
# Add nodes with different colors by type | |
node_groups = defaultdict(list) | |
for node, data in G.nodes(data=True): | |
node_type = data.get("type", "unknown") | |
node_info = ontology_manager.get_entity_info(node) | |
# Get friendly name if available | |
name = node | |
if "properties" in node_info and "name" in node_info["properties"]: | |
name = node_info["properties"]["name"] | |
node_groups[node_type].append({ | |
"id": node, | |
"name": name, | |
"x": pos[node][0], | |
"y": pos[node][1], | |
"info": node_info | |
}) | |
# Define colors for different node types | |
colors = { | |
"central": "#ff7f0e", # Highlighted color for central entity | |
"instance": "#1f77b4", | |
"class": "#2ca02c", | |
"unknown": "#d62728" | |
} | |
# Add each node group with appropriate styling | |
for node_type, nodes in node_groups.items(): | |
# Default to unknown color if type not in map | |
color = colors.get(node_type, colors["unknown"]) | |
x = [node["x"] for node in nodes] | |
y = [node["y"] for node in nodes] | |
text = [node["name"] for node in nodes] | |
# Prepare hover text | |
hover_text = [] | |
for node in nodes: | |
info = node["info"] | |
hover = f"ID: {node['id']}<br>Name: {node['name']}" | |
if "class_type" in info: | |
hover += f"<br>Type: {info['class_type']}" | |
hover_text.append(hover) | |
# Adjust size for central entity | |
size = 20 if node_type == "central" else 15 | |
fig.add_trace(go.Scatter( | |
x=x, | |
y=y, | |
mode="markers+text", | |
marker=dict( | |
size=size, | |
color=color, | |
line=dict(width=2, color="white") | |
), | |
text=text, | |
textposition="bottom center", | |
hoverinfo="text", | |
hovertext=hover_text, | |
name=node_type.capitalize() | |
)) | |
# Update layout | |
fig.update_layout( | |
title=f"Relationships for {entity_id}", | |
title_x=0.5, | |
showlegend=True, | |
hovermode="closest", | |
margin=dict(b=20, l=5, r=5, t=40), | |
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
height=500 | |
) | |
st.plotly_chart(fig, use_container_width=True) | |
def display_graph_visualization(knowledge_graph, central_entity=None, max_distance=2): | |
"""Display an interactive visualization of the knowledge graph.""" | |
st.subheader("🕸️ Knowledge Graph Visualization") | |
# Controls for the visualization | |
with st.expander("Visualization Settings", expanded=True): | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
include_classes = st.checkbox("Include Classes", value=True) | |
with col2: | |
include_instances = st.checkbox("Include Instances", value=True) | |
with col3: | |
include_properties = st.checkbox("Include Properties", value=False) | |
st.markdown("---") | |
col1, col2 = st.columns(2) | |
with col1: | |
max_distance = st.slider("Max Relationship Distance", 1, 5, max_distance) | |
with col2: | |
layout_algorithm = st.selectbox( | |
"Layout Algorithm", | |
["Force-Directed", "Hierarchical", "Radial", "Circular"], | |
index=0 | |
) | |
# Generate HTML visualization | |
html = knowledge_graph.generate_html_visualization( | |
include_classes=include_classes, | |
include_instances=include_instances, | |
central_entity=central_entity, | |
max_distance=max_distance, | |
include_properties=include_properties, | |
layout_algorithm=layout_algorithm.lower() | |
) | |
# Render the HTML | |
render_html_in_streamlit(html) | |
# Entity filter | |
with st.expander("Focus on Entity", expanded=central_entity is not None): | |
# Get all entities | |
entities = [] | |
for class_name in knowledge_graph.ontology_manager.get_classes(): | |
entities.extend(knowledge_graph.ontology_manager.get_instances_of_class(class_name)) | |
# Deduplicate | |
entities = sorted(set(entities)) | |
# Select entity | |
selected_entity = st.selectbox( | |
"Select Entity to Focus On", | |
["None"] + entities, | |
index=0 if central_entity is None else entities.index(central_entity) + 1 | |
) | |
if selected_entity != "None": | |
st.button("Focus Graph", on_click=lambda: st.experimental_rerun()) | |
# Display graph statistics | |
stats = knowledge_graph.get_graph_statistics() | |
if stats: | |
st.markdown("### Graph Statistics") | |
col1, col2, col3, col4 = st.columns(4) | |
col1.metric("Nodes", stats.get("node_count", 0)) | |
col2.metric("Edges", stats.get("edge_count", 0)) | |
col3.metric("Classes", stats.get("class_count", 0)) | |
col4.metric("Instances", stats.get("instance_count", 0)) | |
# Display relationship counts | |
if "relationship_counts" in stats: | |
rel_counts = stats["relationship_counts"] | |
rel_data = [{"Relationship": rel, "Count": count} for rel, count in rel_counts.items() | |
if rel not in ["subClassOf", "instanceOf"]] # Filter out structural relationships | |
if rel_data: | |
df = pd.DataFrame(rel_data) | |
fig = px.bar(df, | |
x="Relationship", | |
y="Count", | |
title="Relationship Distribution", | |
color="Count", | |
color_continuous_scale="viridis") | |
st.plotly_chart(fig, use_container_width=True) | |
def visualize_path(path_info, ontology_manager): | |
"""Visualize a semantic path between entities with enhanced graphics and details.""" | |
import streamlit as st | |
import networkx as nx | |
import matplotlib.pyplot as plt | |
from collections import defaultdict | |
if not path_info or "path" not in path_info: | |
st.warning("No path information available.") | |
return | |
st.subheader("🔄 Semantic Path Visualization") | |
path = path_info["path"] | |
# Get entity information for each node in the path | |
entities = {} | |
all_nodes = set() | |
if "source" in path_info: | |
source_id = path_info["source"] | |
all_nodes.add(source_id) | |
entities[source_id] = ontology_manager.get_entity_info(source_id) | |
if "target" in path_info: | |
target_id = path_info["target"] | |
all_nodes.add(target_id) | |
entities[target_id] = ontology_manager.get_entity_info(target_id) | |
for edge in path: | |
source_id = edge["source"] | |
target_id = edge["target"] | |
all_nodes.add(source_id) | |
all_nodes.add(target_id) | |
if source_id not in entities: | |
entities[source_id] = ontology_manager.get_entity_info(source_id) | |
if target_id not in entities: | |
entities[target_id] = ontology_manager.get_entity_info(target_id) | |
tab1, tab2, tab3 = st.tabs(["Path Visualization", "Entity Details", "Path Summary"]) | |
with tab1: | |
G = nx.DiGraph() | |
for edge in path: | |
G.add_edge(edge["source"], edge["target"], label=edge["type"]) | |
pos = nx.spring_layout(G) | |
edge_labels = nx.get_edge_attributes(G, 'label') | |
fig, ax = plt.subplots() | |
nx.draw(G, pos, with_labels=True, node_color='lightblue', node_size=2000, font_size=10, ax=ax) | |
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_color='red', ax=ax) | |
st.pyplot(fig) | |
with tab2: | |
st.markdown("### Entities in Path") | |
entities_by_type = defaultdict(list) | |
for entity_id in all_nodes: | |
entity_info = entities.get(entity_id, {}) | |
entity_type = entity_info.get("class_type", entity_info.get("class", "Unknown")) | |
entities_by_type[entity_type].append((entity_id, entity_info)) | |
for entity_type, entity_list in entities_by_type.items(): | |
st.subheader(f"{entity_type} ({len(entity_list)})") | |
for entity_id, entity_info in entity_list: | |
st.markdown(f"- **{entity_id}**") | |
for k, v in entity_info.get("properties", {}).items(): | |
st.markdown(f" - {k}: {v}") | |
st.markdown("---") | |
with tab3: | |
st.markdown("### Path Description") | |
if "text" in path_info and path_info["text"]: | |
st.markdown(f"**Path:** {path_info['text']}") | |
else: | |
path_steps = [] | |
for edge in path: | |
source_id = edge["source"] | |
target_id = edge["target"] | |
relation = edge["type"] | |
source_name = entities[source_id].get("properties", {}).get("name", source_id) | |
target_name = entities[target_id].get("properties", {}).get("name", target_id) | |
path_steps.append(f"{source_name} **{relation}** {target_name}") | |
st.markdown(" → ".join(path_steps)) | |
relevant_rules = find_relevant_rules_for_path(path, ontology_manager) | |
if relevant_rules: | |
st.markdown("### Relevant Business Rules") | |
for rule in relevant_rules: | |
st.markdown(f"- **{rule['id']}**: {rule['description']}") | |
def display_path_visualization(path, entities): | |
"""Create an enhanced visual representation of the path.""" | |
if not path: | |
st.info("Path is empty.") | |
return | |
# Create nodes and positions | |
nodes = [] | |
x_positions = {} | |
# Collect all unique nodes in the path | |
unique_nodes = set() | |
for edge in path: | |
unique_nodes.add(edge["source"]) | |
unique_nodes.add(edge["target"]) | |
# Create ordered list of nodes | |
path_nodes = [] | |
if path: | |
# Start with the first source | |
current_node = path[0]["source"] | |
path_nodes.append(current_node) | |
# Follow the path | |
for edge in path: | |
target = edge["target"] | |
path_nodes.append(target) | |
current_node = target | |
else: | |
# If no path, just use the unique nodes | |
path_nodes = list(unique_nodes) | |
# Assign positions along a line | |
for i, node_id in enumerate(path_nodes): | |
x_positions[node_id] = i | |
# Get node info | |
entity_info = entities.get(node_id, {}) | |
properties = entity_info.get("properties", {}) | |
entity_type = entity_info.get("class_type", entity_info.get("class", "Unknown")) | |
# Get display name | |
name = properties.get("name", node_id) | |
nodes.append({ | |
"id": node_id, | |
"name": name, | |
"type": entity_type, | |
"properties": properties | |
}) | |
# Create Plotly figure for horizontal path | |
fig = go.Figure() | |
# Add nodes | |
node_x = [] | |
node_y = [] | |
node_text = [] | |
node_hover = [] | |
node_colors = [] | |
# Color mapping for entity types | |
color_map = {} | |
for node in nodes: | |
node_type = node["type"] | |
if node_type not in color_map: | |
# Assign colors from a categorical colorscale | |
idx = len(color_map) % len(px.colors.qualitative.Plotly) | |
color_map[node_type] = px.colors.qualitative.Plotly[idx] | |
for node in nodes: | |
node_x.append(x_positions[node["id"]]) | |
node_y.append(0) # All nodes at y=0 for a horizontal path | |
node_text.append(node["name"]) | |
# Create detailed hover text | |
hover = f"{node['id']}<br>{node['type']}" | |
for k, v in node["properties"].items(): | |
hover += f"<br>{k}: {v}" | |
node_hover.append(hover) | |
# Set node color by type | |
node_colors.append(color_map.get(node["type"], "#7f7f7f")) | |
# Add node trace | |
fig.add_trace(go.Scatter( | |
x=node_x, | |
y=node_y, | |
mode="markers+text", | |
marker=dict( | |
size=30, | |
color=node_colors, | |
line=dict(width=2, color="DarkSlateGrey") | |
), | |
text=node_text, | |
textposition="bottom center", | |
hovertext=node_hover, | |
hoverinfo="text", | |
name="Entities" | |
)) | |
# Add edges with relationship labels | |
for edge in path: | |
source = edge["source"] | |
target = edge["target"] | |
edge_type = edge["type"] | |
source_pos = x_positions[source] | |
target_pos = x_positions[target] | |
# Add edge line | |
fig.add_trace(go.Scatter( | |
x=[source_pos, target_pos], | |
y=[0, 0], | |
mode="lines", | |
line=dict(width=2, color="#888"), | |
hoverinfo="none", | |
showlegend=False | |
)) | |
# Add relationship label above the line | |
fig.add_trace(go.Scatter( | |
x=[(source_pos + target_pos) / 2], | |
y=[0.1], # Slightly above the line | |
mode="text", | |
text=[edge_type], | |
textposition="top center", | |
hoverinfo="none", | |
showlegend=False | |
)) | |
# Update layout | |
fig.update_layout( | |
title="Path Visualization", | |
showlegend=False, | |
hovermode="closest", | |
margin=dict(b=40, l=20, r=20, t=40), | |
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
height=300, | |
plot_bgcolor="white" | |
) | |
# Add a legend for entity types | |
for entity_type, color in color_map.items(): | |
fig.add_trace(go.Scatter( | |
x=[None], | |
y=[None], | |
mode="markers", | |
marker=dict(size=10, color=color), | |
name=entity_type, | |
showlegend=True | |
)) | |
fig.update_layout(legend=dict( | |
orientation="h", | |
yanchor="bottom", | |
y=-0.3, | |
xanchor="center", | |
x=0.5 | |
)) | |
st.plotly_chart(fig, use_container_width=True) | |
# Add step-by-step description | |
st.markdown("### Step-by-Step Path") | |
for i, edge in enumerate(path): | |
source = edge["source"] | |
target = edge["target"] | |
relation = edge["type"] | |
# Get display names | |
source_info = entities.get(source, {}) | |
target_info = entities.get(target, {}) | |
source_name = source | |
if "properties" in source_info and "name" in source_info["properties"]: | |
source_name = source_info["properties"]["name"] | |
target_name = target | |
if "properties" in target_info and "name" in target_info["properties"]: | |
target_name = target_info["properties"]["name"] | |
st.markdown(f"**Step {i+1}:** {source_name} ({source}) **{relation}** {target_name} ({target})") | |
def find_relevant_rules_for_path(path, ontology_manager): | |
"""Find business rules relevant to the entities and relationships in a path.""" | |
rules = ontology_manager.ontology_data.get("rules", []) | |
if not rules: | |
return [] | |
# Extract entities and relationships from the path | |
entity_types = set() | |
relationship_types = set() | |
for edge in path: | |
source = edge["source"] | |
target = edge["target"] | |
relation = edge["type"] | |
# Get entity info | |
source_info = ontology_manager.get_entity_info(source) | |
target_info = ontology_manager.get_entity_info(target) | |
# Add entity types | |
if "class_type" in source_info: | |
entity_types.add(source_info["class_type"]) | |
if "class_type" in target_info: | |
entity_types.add(target_info["class_type"]) | |
# Add relationship type | |
relationship_types.add(relation) | |
# Find rules that mention these entities or relationships | |
relevant_rules = [] | |
for rule in rules: | |
rule_text = json.dumps(rule).lower() | |
# Check if rule mentions any of the entity types or relationships | |
is_relevant = False | |
for entity_type in entity_types: | |
if entity_type.lower() in rule_text: | |
is_relevant = True | |
break | |
if not is_relevant: | |
for rel_type in relationship_types: | |
if rel_type.lower() in rule_text: | |
is_relevant = True | |
break | |
if is_relevant: | |
relevant_rules.append(rule) | |
return relevant_rules |