Spaces:
Running
Running
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Rhizome | |
# Version beta 0.0, August 2023 | |
# Property of IBM Research, Accelerated Discovery | |
# | |
""" | |
PLEASE NOTE THIS IMPLEMENTATION INCLUDES THE ORIGINAL SOURCE CODE (AND SOME ADAPTATIONS) | |
OF THE MHG IMPLEMENTATION OF HIROSHI KAJINO AT IBM TRL ALREADY PUBLICLY AVAILABLE. | |
THIS MIGHT INFLUENCE THE DECISION OF THE FINAL LICENSE SO CAREFUL CHECK NEEDS BE DONE. | |
""" | |
""" Title """ | |
__author__ = "Hiroshi Kajino <[email protected]>" | |
__copyright__ = "(c) Copyright IBM Corp. 2018" | |
__version__ = "0.1" | |
__date__ = "Jan 31 2018" | |
from copy import deepcopy | |
from typing import List, Dict, Tuple | |
import networkx as nx | |
import numpy as np | |
import os | |
class Hypergraph(object): | |
''' | |
A class of a hypergraph. | |
Each hyperedge can be ordered. For the ordered case, | |
edges adjacent to the hyperedge node are labeled by their orders. | |
Attributes | |
---------- | |
hg : nx.Graph | |
a bipartite graph representation of a hypergraph | |
edge_idx : int | |
total number of hyperedges that exist so far | |
''' | |
def __init__(self): | |
self.hg = nx.Graph() | |
self.edge_idx = 0 | |
self.nodes = set([]) | |
self.num_nodes = 0 | |
self.edges = set([]) | |
self.num_edges = 0 | |
self.nodes_in_edge_dict = {} | |
def add_node(self, node: str, attr_dict=None): | |
''' add a node to hypergraph | |
Parameters | |
---------- | |
node : str | |
node name | |
attr_dict : dict | |
dictionary of node attributes | |
''' | |
self.hg.add_node(node, bipartite='node', attr_dict=attr_dict) | |
if node not in self.nodes: | |
self.num_nodes += 1 | |
self.nodes.add(node) | |
def add_edge(self, node_list: List[str], attr_dict=None, edge_name=None): | |
''' add an edge consisting of nodes `node_list` | |
Parameters | |
---------- | |
node_list : list | |
ordered list of nodes that consist the edge | |
attr_dict : dict | |
dictionary of edge attributes | |
''' | |
if edge_name is None: | |
edge = 'e{}'.format(self.edge_idx) | |
else: | |
assert edge_name not in self.edges | |
edge = edge_name | |
self.hg.add_node(edge, bipartite='edge', attr_dict=attr_dict) | |
if edge not in self.edges: | |
self.num_edges += 1 | |
self.edges.add(edge) | |
self.nodes_in_edge_dict[edge] = node_list | |
if type(node_list) == list: | |
for node_idx, each_node in enumerate(node_list): | |
self.hg.add_edge(edge, each_node, order=node_idx) | |
if each_node not in self.nodes: | |
self.num_nodes += 1 | |
self.nodes.add(each_node) | |
elif type(node_list) == set: | |
for each_node in node_list: | |
self.hg.add_edge(edge, each_node, order=-1) | |
if each_node not in self.nodes: | |
self.num_nodes += 1 | |
self.nodes.add(each_node) | |
else: | |
raise ValueError | |
self.edge_idx += 1 | |
return edge | |
def remove_node(self, node: str, remove_connected_edges=True): | |
''' remove a node | |
Parameters | |
---------- | |
node : str | |
node name | |
remove_connected_edges : bool | |
if True, remove edges that are adjacent to the node | |
''' | |
if remove_connected_edges: | |
connected_edges = deepcopy(self.adj_edges(node)) | |
for each_edge in connected_edges: | |
self.remove_edge(each_edge) | |
self.hg.remove_node(node) | |
self.num_nodes -= 1 | |
self.nodes.remove(node) | |
def remove_nodes(self, node_iter, remove_connected_edges=True): | |
''' remove a set of nodes | |
Parameters | |
---------- | |
node_iter : iterator of strings | |
nodes to be removed | |
remove_connected_edges : bool | |
if True, remove edges that are adjacent to the node | |
''' | |
for each_node in node_iter: | |
self.remove_node(each_node, remove_connected_edges) | |
def remove_edge(self, edge: str): | |
''' remove an edge | |
Parameters | |
---------- | |
edge : str | |
edge to be removed | |
''' | |
self.hg.remove_node(edge) | |
self.edges.remove(edge) | |
self.num_edges -= 1 | |
self.nodes_in_edge_dict.pop(edge) | |
def remove_edges(self, edge_iter): | |
''' remove a set of edges | |
Parameters | |
---------- | |
edge_iter : iterator of strings | |
edges to be removed | |
''' | |
for each_edge in edge_iter: | |
self.remove_edge(each_edge) | |
def remove_edges_with_attr(self, edge_attr_dict): | |
remove_edge_list = [] | |
for each_edge in self.edges: | |
satisfy = True | |
for each_key, each_val in edge_attr_dict.items(): | |
if not satisfy: | |
break | |
try: | |
if self.edge_attr(each_edge)[each_key] != each_val: | |
satisfy = False | |
except KeyError: | |
satisfy = False | |
if satisfy: | |
remove_edge_list.append(each_edge) | |
self.remove_edges(remove_edge_list) | |
def remove_subhg(self, subhg): | |
''' remove subhypergraph. | |
all of the hyperedges are removed. | |
each node of subhg is removed if its degree becomes 0 after removing hyperedges. | |
Parameters | |
---------- | |
subhg : Hypergraph | |
''' | |
for each_edge in subhg.edges: | |
self.remove_edge(each_edge) | |
for each_node in subhg.nodes: | |
if self.degree(each_node) == 0: | |
self.remove_node(each_node) | |
def nodes_in_edge(self, edge): | |
''' return an ordered list of nodes in a given edge. | |
Parameters | |
---------- | |
edge : str | |
edge whose nodes are returned | |
Returns | |
------- | |
list or set | |
ordered list or set of nodes that belong to the edge | |
''' | |
if edge.startswith('e'): | |
return self.nodes_in_edge_dict[edge] | |
else: | |
adj_node_list = self.hg.adj[edge] | |
adj_node_order_list = [] | |
adj_node_name_list = [] | |
for each_node in adj_node_list: | |
adj_node_order_list.append(adj_node_list[each_node]['order']) | |
adj_node_name_list.append(each_node) | |
if adj_node_order_list == [-1] * len(adj_node_order_list): | |
return set(adj_node_name_list) | |
else: | |
return [adj_node_name_list[each_idx] for each_idx | |
in np.argsort(adj_node_order_list)] | |
def adj_edges(self, node): | |
''' return a dict of adjacent hyperedges | |
Parameters | |
---------- | |
node : str | |
Returns | |
------- | |
set | |
set of edges that are adjacent to `node` | |
''' | |
return self.hg.adj[node] | |
def adj_nodes(self, node): | |
''' return a set of adjacent nodes | |
Parameters | |
---------- | |
node : str | |
Returns | |
------- | |
set | |
set of nodes that are adjacent to `node` | |
''' | |
node_set = set([]) | |
for each_adj_edge in self.adj_edges(node): | |
node_set.update(set(self.nodes_in_edge(each_adj_edge))) | |
node_set.discard(node) | |
return node_set | |
def has_edge(self, node_list, ignore_order=False): | |
for each_edge in self.edges: | |
if ignore_order: | |
if set(self.nodes_in_edge(each_edge)) == set(node_list): | |
return each_edge | |
else: | |
if self.nodes_in_edge(each_edge) == node_list: | |
return each_edge | |
return False | |
def degree(self, node): | |
return len(self.hg.adj[node]) | |
def degrees(self): | |
return {each_node: self.degree(each_node) for each_node in self.nodes} | |
def edge_degree(self, edge): | |
return len(self.nodes_in_edge(edge)) | |
def edge_degrees(self): | |
return {each_edge: self.edge_degree(each_edge) for each_edge in self.edges} | |
def is_adj(self, node1, node2): | |
return node1 in self.adj_nodes(node2) | |
def adj_subhg(self, node, ident_node_dict=None): | |
""" return a subhypergraph consisting of a set of nodes and hyperedges adjacent to `node`. | |
if an adjacent node has a self-loop hyperedge, it will be also added to the subhypergraph. | |
Parameters | |
---------- | |
node : str | |
ident_node_dict : dict | |
dict containing identical nodes. see `get_identical_node_dict` for more details | |
Returns | |
------- | |
subhg : Hypergraph | |
""" | |
if ident_node_dict is None: | |
ident_node_dict = self.get_identical_node_dict() | |
adj_node_set = set(ident_node_dict[node]) | |
adj_edge_set = set([]) | |
for each_node in ident_node_dict[node]: | |
adj_edge_set.update(set(self.adj_edges(each_node))) | |
fixed_adj_edge_set = deepcopy(adj_edge_set) | |
for each_edge in fixed_adj_edge_set: | |
other_nodes = self.nodes_in_edge(each_edge) | |
adj_node_set.update(other_nodes) | |
# if the adjacent node has self-loop edge, it will be appended to adj_edge_list. | |
for each_node in other_nodes: | |
for other_edge in set(self.adj_edges(each_node)) - set([each_edge]): | |
if len(set(self.nodes_in_edge(other_edge)) \ | |
- set(self.nodes_in_edge(each_edge))) == 0: | |
adj_edge_set.update(set([other_edge])) | |
subhg = Hypergraph() | |
for each_node in adj_node_set: | |
subhg.add_node(each_node, attr_dict=self.node_attr(each_node)) | |
for each_edge in adj_edge_set: | |
subhg.add_edge(self.nodes_in_edge(each_edge), | |
attr_dict=self.edge_attr(each_edge), | |
edge_name=each_edge) | |
subhg.edge_idx = self.edge_idx | |
return subhg | |
def get_subhg(self, node_list, edge_list, ident_node_dict=None): | |
""" return a subhypergraph consisting of a set of nodes and hyperedges adjacent to `node`. | |
if an adjacent node has a self-loop hyperedge, it will be also added to the subhypergraph. | |
Parameters | |
---------- | |
node : str | |
ident_node_dict : dict | |
dict containing identical nodes. see `get_identical_node_dict` for more details | |
Returns | |
------- | |
subhg : Hypergraph | |
""" | |
if ident_node_dict is None: | |
ident_node_dict = self.get_identical_node_dict() | |
adj_node_set = set([]) | |
for each_node in node_list: | |
adj_node_set.update(set(ident_node_dict[each_node])) | |
adj_edge_set = set(edge_list) | |
subhg = Hypergraph() | |
for each_node in adj_node_set: | |
subhg.add_node(each_node, | |
attr_dict=deepcopy(self.node_attr(each_node))) | |
for each_edge in adj_edge_set: | |
subhg.add_edge(self.nodes_in_edge(each_edge), | |
attr_dict=deepcopy(self.edge_attr(each_edge)), | |
edge_name=each_edge) | |
subhg.edge_idx = self.edge_idx | |
return subhg | |
def copy(self): | |
''' return a copy of the object | |
Returns | |
------- | |
Hypergraph | |
''' | |
return deepcopy(self) | |
def node_attr(self, node): | |
return self.hg.nodes[node]['attr_dict'] | |
def edge_attr(self, edge): | |
return self.hg.nodes[edge]['attr_dict'] | |
def set_node_attr(self, node, attr_dict): | |
for each_key, each_val in attr_dict.items(): | |
self.hg.nodes[node]['attr_dict'][each_key] = each_val | |
def set_edge_attr(self, edge, attr_dict): | |
for each_key, each_val in attr_dict.items(): | |
self.hg.nodes[edge]['attr_dict'][each_key] = each_val | |
def get_identical_node_dict(self): | |
''' get identical nodes | |
nodes are identical if they share the same set of adjacent edges. | |
Returns | |
------- | |
ident_node_dict : dict | |
ident_node_dict[node] returns a list of nodes that are identical to `node`. | |
''' | |
ident_node_dict = {} | |
for each_node in self.nodes: | |
ident_node_list = [] | |
for each_other_node in self.nodes: | |
if each_other_node == each_node: | |
ident_node_list.append(each_other_node) | |
elif self.adj_edges(each_node) == self.adj_edges(each_other_node) \ | |
and len(self.adj_edges(each_node)) != 0: | |
ident_node_list.append(each_other_node) | |
ident_node_dict[each_node] = ident_node_list | |
return ident_node_dict | |
''' | |
ident_node_dict = {} | |
for each_node in self.nodes: | |
ident_node_dict[each_node] = [each_node] | |
return ident_node_dict | |
''' | |
def get_leaf_edge(self): | |
''' get an edge that is incident only to one edge | |
Returns | |
------- | |
if exists, return a leaf edge. otherwise, return None. | |
''' | |
for each_edge in self.edges: | |
if len(self.adj_nodes(each_edge)) == 1: | |
if 'tmp' not in self.edge_attr(each_edge): | |
return each_edge | |
return None | |
def get_nontmp_edge(self): | |
for each_edge in self.edges: | |
if 'tmp' not in self.edge_attr(each_edge): | |
return each_edge | |
return None | |
def is_subhg(self, hg): | |
''' return whether this hypergraph is a subhypergraph of `hg` | |
Returns | |
------- | |
True if self \in hg, | |
False otherwise. | |
''' | |
for each_node in self.nodes: | |
if each_node not in hg.nodes: | |
return False | |
for each_edge in self.edges: | |
if each_edge not in hg.edges: | |
return False | |
return True | |
def in_cycle(self, node, visited=None, parent='', root_node='') -> bool: | |
''' if `node` is in a cycle, then return True. otherwise, False. | |
Parameters | |
---------- | |
node : str | |
node in a hypergraph | |
visited : list | |
list of visited nodes, used for recursion | |
parent : str | |
parent node, used to eliminate a cycle consisting of two nodes and one edge. | |
Returns | |
------- | |
bool | |
''' | |
if visited is None: | |
visited = [] | |
if parent == '': | |
visited = [] | |
if root_node == '': | |
root_node = node | |
visited.append(node) | |
for each_adj_node in self.adj_nodes(node): | |
if each_adj_node not in visited: | |
if self.in_cycle(each_adj_node, visited, node, root_node): | |
return True | |
elif each_adj_node != parent and each_adj_node == root_node: | |
return True | |
return False | |
def draw(self, file_path=None, with_node=False, with_edge_name=False): | |
''' draw hypergraph | |
''' | |
import graphviz | |
G = graphviz.Graph(format='png') | |
for each_node in self.nodes: | |
if 'ext_id' in self.node_attr(each_node): | |
G.node(each_node, label='', | |
shape='circle', width='0.1', height='0.1', style='filled', | |
fillcolor='black') | |
else: | |
if with_node: | |
G.node(each_node, label='', | |
shape='circle', width='0.1', height='0.1', style='filled', | |
fillcolor='gray') | |
edge_list = [] | |
for each_edge in self.edges: | |
if self.edge_attr(each_edge).get('terminal', False): | |
G.node(each_edge, | |
label=self.edge_attr(each_edge)['symbol'].symbol if not with_edge_name \ | |
else self.edge_attr(each_edge)['symbol'].symbol + ', ' + each_edge, | |
fontcolor='black', shape='square') | |
elif self.edge_attr(each_edge).get('tmp', False): | |
G.node(each_edge, label='tmp' if not with_edge_name else 'tmp, ' + each_edge, | |
fontcolor='black', shape='square') | |
else: | |
G.node(each_edge, | |
label=self.edge_attr(each_edge)['symbol'].symbol if not with_edge_name \ | |
else self.edge_attr(each_edge)['symbol'].symbol + ', ' + each_edge, | |
fontcolor='black', shape='square', style='filled') | |
if with_node: | |
for each_node in self.nodes_in_edge(each_edge): | |
G.edge(each_edge, each_node) | |
else: | |
for each_node in self.nodes_in_edge(each_edge): | |
if 'ext_id' in self.node_attr(each_node)\ | |
and set([each_node, each_edge]) not in edge_list: | |
G.edge(each_edge, each_node) | |
edge_list.append(set([each_node, each_edge])) | |
for each_other_edge in self.adj_nodes(each_edge): | |
if set([each_edge, each_other_edge]) not in edge_list: | |
num_bond = 0 | |
common_node_set = set(self.nodes_in_edge(each_edge))\ | |
.intersection(set(self.nodes_in_edge(each_other_edge))) | |
for each_node in common_node_set: | |
if self.node_attr(each_node)['symbol'].bond_type in [1, 2, 3]: | |
num_bond += self.node_attr(each_node)['symbol'].bond_type | |
elif self.node_attr(each_node)['symbol'].bond_type in [12]: | |
num_bond += 1 | |
else: | |
raise NotImplementedError('unsupported bond type') | |
for _ in range(num_bond): | |
G.edge(each_edge, each_other_edge) | |
edge_list.append(set([each_edge, each_other_edge])) | |
if file_path is not None: | |
G.render(file_path, cleanup=True) | |
#os.remove(file_path) | |
return G | |
def is_dividable(self, node): | |
_hg = deepcopy(self.hg) | |
_hg.remove_node(node) | |
return (not nx.is_connected(_hg)) | |
def divide(self, node): | |
subhg_list = [] | |
hg_wo_node = deepcopy(self) | |
hg_wo_node.remove_node(node, remove_connected_edges=False) | |
connected_components = nx.connected_components(hg_wo_node.hg) | |
for each_component in connected_components: | |
node_list = [node] | |
edge_list = [] | |
node_list.extend([each_node for each_node in each_component | |
if each_node.startswith('bond_')]) | |
edge_list.extend([each_edge for each_edge in each_component | |
if each_edge.startswith('e')]) | |
subhg_list.append(self.get_subhg(node_list, edge_list)) | |
#subhg_list[-1].set_node_attr(node, {'divided': True}) | |
return subhg_list | |