|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
PLEASE NOTE THIS IMPLEMENTATION INCLUDES THE ORIGINAL SOURCE CODE (AND SOME ADAPTATIONS) |
|
OF THE MHG IMPLEMENTATION OF HIROSHI KAJINO AT IBM TRL ALREADY PUBLICLY AVAILABLE. |
|
THIS MIGHT INFLUENCE THE DECISION OF THE FINAL LICENSE SO CAREFUL CHECK NEEDS BE DONE. |
|
""" |
|
|
|
""" Title """ |
|
|
|
__author__ = "Hiroshi Kajino <[email protected]>" |
|
__copyright__ = "(c) Copyright IBM Corp. 2018" |
|
__version__ = "0.1" |
|
__date__ = "Jan 31 2018" |
|
|
|
from copy import deepcopy |
|
from typing import List, Dict, Tuple |
|
import networkx as nx |
|
import numpy as np |
|
import os |
|
|
|
|
|
class Hypergraph(object): |
|
''' |
|
A class of a hypergraph. |
|
Each hyperedge can be ordered. For the ordered case, |
|
edges adjacent to the hyperedge node are labeled by their orders. |
|
|
|
Attributes |
|
---------- |
|
hg : nx.Graph |
|
a bipartite graph representation of a hypergraph |
|
edge_idx : int |
|
total number of hyperedges that exist so far |
|
''' |
|
def __init__(self): |
|
self.hg = nx.Graph() |
|
self.edge_idx = 0 |
|
self.nodes = set([]) |
|
self.num_nodes = 0 |
|
self.edges = set([]) |
|
self.num_edges = 0 |
|
self.nodes_in_edge_dict = {} |
|
|
|
def add_node(self, node: str, attr_dict=None): |
|
''' add a node to hypergraph |
|
|
|
Parameters |
|
---------- |
|
node : str |
|
node name |
|
attr_dict : dict |
|
dictionary of node attributes |
|
''' |
|
self.hg.add_node(node, bipartite='node', attr_dict=attr_dict) |
|
if node not in self.nodes: |
|
self.num_nodes += 1 |
|
self.nodes.add(node) |
|
|
|
def add_edge(self, node_list: List[str], attr_dict=None, edge_name=None): |
|
''' add an edge consisting of nodes `node_list` |
|
|
|
Parameters |
|
---------- |
|
node_list : list |
|
ordered list of nodes that consist the edge |
|
attr_dict : dict |
|
dictionary of edge attributes |
|
''' |
|
if edge_name is None: |
|
edge = 'e{}'.format(self.edge_idx) |
|
else: |
|
assert edge_name not in self.edges |
|
edge = edge_name |
|
self.hg.add_node(edge, bipartite='edge', attr_dict=attr_dict) |
|
if edge not in self.edges: |
|
self.num_edges += 1 |
|
self.edges.add(edge) |
|
self.nodes_in_edge_dict[edge] = node_list |
|
if type(node_list) == list: |
|
for node_idx, each_node in enumerate(node_list): |
|
self.hg.add_edge(edge, each_node, order=node_idx) |
|
if each_node not in self.nodes: |
|
self.num_nodes += 1 |
|
self.nodes.add(each_node) |
|
|
|
elif type(node_list) == set: |
|
for each_node in node_list: |
|
self.hg.add_edge(edge, each_node, order=-1) |
|
if each_node not in self.nodes: |
|
self.num_nodes += 1 |
|
self.nodes.add(each_node) |
|
else: |
|
raise ValueError |
|
self.edge_idx += 1 |
|
return edge |
|
|
|
def remove_node(self, node: str, remove_connected_edges=True): |
|
''' remove a node |
|
|
|
Parameters |
|
---------- |
|
node : str |
|
node name |
|
remove_connected_edges : bool |
|
if True, remove edges that are adjacent to the node |
|
''' |
|
if remove_connected_edges: |
|
connected_edges = deepcopy(self.adj_edges(node)) |
|
for each_edge in connected_edges: |
|
self.remove_edge(each_edge) |
|
self.hg.remove_node(node) |
|
self.num_nodes -= 1 |
|
self.nodes.remove(node) |
|
|
|
def remove_nodes(self, node_iter, remove_connected_edges=True): |
|
''' remove a set of nodes |
|
|
|
Parameters |
|
---------- |
|
node_iter : iterator of strings |
|
nodes to be removed |
|
remove_connected_edges : bool |
|
if True, remove edges that are adjacent to the node |
|
''' |
|
for each_node in node_iter: |
|
self.remove_node(each_node, remove_connected_edges) |
|
|
|
def remove_edge(self, edge: str): |
|
''' remove an edge |
|
|
|
Parameters |
|
---------- |
|
edge : str |
|
edge to be removed |
|
''' |
|
self.hg.remove_node(edge) |
|
self.edges.remove(edge) |
|
self.num_edges -= 1 |
|
self.nodes_in_edge_dict.pop(edge) |
|
|
|
def remove_edges(self, edge_iter): |
|
''' remove a set of edges |
|
|
|
Parameters |
|
---------- |
|
edge_iter : iterator of strings |
|
edges to be removed |
|
''' |
|
for each_edge in edge_iter: |
|
self.remove_edge(each_edge) |
|
|
|
def remove_edges_with_attr(self, edge_attr_dict): |
|
remove_edge_list = [] |
|
for each_edge in self.edges: |
|
satisfy = True |
|
for each_key, each_val in edge_attr_dict.items(): |
|
if not satisfy: |
|
break |
|
try: |
|
if self.edge_attr(each_edge)[each_key] != each_val: |
|
satisfy = False |
|
except KeyError: |
|
satisfy = False |
|
if satisfy: |
|
remove_edge_list.append(each_edge) |
|
self.remove_edges(remove_edge_list) |
|
|
|
def remove_subhg(self, subhg): |
|
''' remove subhypergraph. |
|
all of the hyperedges are removed. |
|
each node of subhg is removed if its degree becomes 0 after removing hyperedges. |
|
|
|
Parameters |
|
---------- |
|
subhg : Hypergraph |
|
''' |
|
for each_edge in subhg.edges: |
|
self.remove_edge(each_edge) |
|
for each_node in subhg.nodes: |
|
if self.degree(each_node) == 0: |
|
self.remove_node(each_node) |
|
|
|
def nodes_in_edge(self, edge): |
|
''' return an ordered list of nodes in a given edge. |
|
|
|
Parameters |
|
---------- |
|
edge : str |
|
edge whose nodes are returned |
|
|
|
Returns |
|
------- |
|
list or set |
|
ordered list or set of nodes that belong to the edge |
|
''' |
|
if edge.startswith('e'): |
|
return self.nodes_in_edge_dict[edge] |
|
else: |
|
adj_node_list = self.hg.adj[edge] |
|
adj_node_order_list = [] |
|
adj_node_name_list = [] |
|
for each_node in adj_node_list: |
|
adj_node_order_list.append(adj_node_list[each_node]['order']) |
|
adj_node_name_list.append(each_node) |
|
if adj_node_order_list == [-1] * len(adj_node_order_list): |
|
return set(adj_node_name_list) |
|
else: |
|
return [adj_node_name_list[each_idx] for each_idx |
|
in np.argsort(adj_node_order_list)] |
|
|
|
def adj_edges(self, node): |
|
''' return a dict of adjacent hyperedges |
|
|
|
Parameters |
|
---------- |
|
node : str |
|
|
|
Returns |
|
------- |
|
set |
|
set of edges that are adjacent to `node` |
|
''' |
|
return self.hg.adj[node] |
|
|
|
def adj_nodes(self, node): |
|
''' return a set of adjacent nodes |
|
|
|
Parameters |
|
---------- |
|
node : str |
|
|
|
Returns |
|
------- |
|
set |
|
set of nodes that are adjacent to `node` |
|
''' |
|
node_set = set([]) |
|
for each_adj_edge in self.adj_edges(node): |
|
node_set.update(set(self.nodes_in_edge(each_adj_edge))) |
|
node_set.discard(node) |
|
return node_set |
|
|
|
def has_edge(self, node_list, ignore_order=False): |
|
for each_edge in self.edges: |
|
if ignore_order: |
|
if set(self.nodes_in_edge(each_edge)) == set(node_list): |
|
return each_edge |
|
else: |
|
if self.nodes_in_edge(each_edge) == node_list: |
|
return each_edge |
|
return False |
|
|
|
def degree(self, node): |
|
return len(self.hg.adj[node]) |
|
|
|
def degrees(self): |
|
return {each_node: self.degree(each_node) for each_node in self.nodes} |
|
|
|
def edge_degree(self, edge): |
|
return len(self.nodes_in_edge(edge)) |
|
|
|
def edge_degrees(self): |
|
return {each_edge: self.edge_degree(each_edge) for each_edge in self.edges} |
|
|
|
def is_adj(self, node1, node2): |
|
return node1 in self.adj_nodes(node2) |
|
|
|
def adj_subhg(self, node, ident_node_dict=None): |
|
""" return a subhypergraph consisting of a set of nodes and hyperedges adjacent to `node`. |
|
if an adjacent node has a self-loop hyperedge, it will be also added to the subhypergraph. |
|
|
|
Parameters |
|
---------- |
|
node : str |
|
ident_node_dict : dict |
|
dict containing identical nodes. see `get_identical_node_dict` for more details |
|
|
|
Returns |
|
------- |
|
subhg : Hypergraph |
|
""" |
|
if ident_node_dict is None: |
|
ident_node_dict = self.get_identical_node_dict() |
|
adj_node_set = set(ident_node_dict[node]) |
|
adj_edge_set = set([]) |
|
for each_node in ident_node_dict[node]: |
|
adj_edge_set.update(set(self.adj_edges(each_node))) |
|
fixed_adj_edge_set = deepcopy(adj_edge_set) |
|
for each_edge in fixed_adj_edge_set: |
|
other_nodes = self.nodes_in_edge(each_edge) |
|
adj_node_set.update(other_nodes) |
|
|
|
|
|
for each_node in other_nodes: |
|
for other_edge in set(self.adj_edges(each_node)) - set([each_edge]): |
|
if len(set(self.nodes_in_edge(other_edge)) \ |
|
- set(self.nodes_in_edge(each_edge))) == 0: |
|
adj_edge_set.update(set([other_edge])) |
|
subhg = Hypergraph() |
|
for each_node in adj_node_set: |
|
subhg.add_node(each_node, attr_dict=self.node_attr(each_node)) |
|
for each_edge in adj_edge_set: |
|
subhg.add_edge(self.nodes_in_edge(each_edge), |
|
attr_dict=self.edge_attr(each_edge), |
|
edge_name=each_edge) |
|
subhg.edge_idx = self.edge_idx |
|
return subhg |
|
|
|
def get_subhg(self, node_list, edge_list, ident_node_dict=None): |
|
""" return a subhypergraph consisting of a set of nodes and hyperedges adjacent to `node`. |
|
if an adjacent node has a self-loop hyperedge, it will be also added to the subhypergraph. |
|
|
|
Parameters |
|
---------- |
|
node : str |
|
ident_node_dict : dict |
|
dict containing identical nodes. see `get_identical_node_dict` for more details |
|
|
|
Returns |
|
------- |
|
subhg : Hypergraph |
|
""" |
|
if ident_node_dict is None: |
|
ident_node_dict = self.get_identical_node_dict() |
|
adj_node_set = set([]) |
|
for each_node in node_list: |
|
adj_node_set.update(set(ident_node_dict[each_node])) |
|
adj_edge_set = set(edge_list) |
|
|
|
subhg = Hypergraph() |
|
for each_node in adj_node_set: |
|
subhg.add_node(each_node, |
|
attr_dict=deepcopy(self.node_attr(each_node))) |
|
for each_edge in adj_edge_set: |
|
subhg.add_edge(self.nodes_in_edge(each_edge), |
|
attr_dict=deepcopy(self.edge_attr(each_edge)), |
|
edge_name=each_edge) |
|
subhg.edge_idx = self.edge_idx |
|
return subhg |
|
|
|
def copy(self): |
|
''' return a copy of the object |
|
|
|
Returns |
|
------- |
|
Hypergraph |
|
''' |
|
return deepcopy(self) |
|
|
|
def node_attr(self, node): |
|
return self.hg.nodes[node]['attr_dict'] |
|
|
|
def edge_attr(self, edge): |
|
return self.hg.nodes[edge]['attr_dict'] |
|
|
|
def set_node_attr(self, node, attr_dict): |
|
for each_key, each_val in attr_dict.items(): |
|
self.hg.nodes[node]['attr_dict'][each_key] = each_val |
|
|
|
def set_edge_attr(self, edge, attr_dict): |
|
for each_key, each_val in attr_dict.items(): |
|
self.hg.nodes[edge]['attr_dict'][each_key] = each_val |
|
|
|
def get_identical_node_dict(self): |
|
''' get identical nodes |
|
nodes are identical if they share the same set of adjacent edges. |
|
|
|
Returns |
|
------- |
|
ident_node_dict : dict |
|
ident_node_dict[node] returns a list of nodes that are identical to `node`. |
|
''' |
|
ident_node_dict = {} |
|
for each_node in self.nodes: |
|
ident_node_list = [] |
|
for each_other_node in self.nodes: |
|
if each_other_node == each_node: |
|
ident_node_list.append(each_other_node) |
|
elif self.adj_edges(each_node) == self.adj_edges(each_other_node) \ |
|
and len(self.adj_edges(each_node)) != 0: |
|
ident_node_list.append(each_other_node) |
|
ident_node_dict[each_node] = ident_node_list |
|
return ident_node_dict |
|
''' |
|
ident_node_dict = {} |
|
for each_node in self.nodes: |
|
ident_node_dict[each_node] = [each_node] |
|
return ident_node_dict |
|
''' |
|
|
|
def get_leaf_edge(self): |
|
''' get an edge that is incident only to one edge |
|
|
|
Returns |
|
------- |
|
if exists, return a leaf edge. otherwise, return None. |
|
''' |
|
for each_edge in self.edges: |
|
if len(self.adj_nodes(each_edge)) == 1: |
|
if 'tmp' not in self.edge_attr(each_edge): |
|
return each_edge |
|
return None |
|
|
|
def get_nontmp_edge(self): |
|
for each_edge in self.edges: |
|
if 'tmp' not in self.edge_attr(each_edge): |
|
return each_edge |
|
return None |
|
|
|
def is_subhg(self, hg): |
|
''' return whether this hypergraph is a subhypergraph of `hg` |
|
|
|
Returns |
|
------- |
|
True if self \in hg, |
|
False otherwise. |
|
''' |
|
for each_node in self.nodes: |
|
if each_node not in hg.nodes: |
|
return False |
|
for each_edge in self.edges: |
|
if each_edge not in hg.edges: |
|
return False |
|
return True |
|
|
|
def in_cycle(self, node, visited=None, parent='', root_node='') -> bool: |
|
''' if `node` is in a cycle, then return True. otherwise, False. |
|
|
|
Parameters |
|
---------- |
|
node : str |
|
node in a hypergraph |
|
visited : list |
|
list of visited nodes, used for recursion |
|
parent : str |
|
parent node, used to eliminate a cycle consisting of two nodes and one edge. |
|
|
|
Returns |
|
------- |
|
bool |
|
''' |
|
if visited is None: |
|
visited = [] |
|
if parent == '': |
|
visited = [] |
|
if root_node == '': |
|
root_node = node |
|
visited.append(node) |
|
for each_adj_node in self.adj_nodes(node): |
|
if each_adj_node not in visited: |
|
if self.in_cycle(each_adj_node, visited, node, root_node): |
|
return True |
|
elif each_adj_node != parent and each_adj_node == root_node: |
|
return True |
|
return False |
|
|
|
|
|
def draw(self, file_path=None, with_node=False, with_edge_name=False): |
|
''' draw hypergraph |
|
''' |
|
import graphviz |
|
G = graphviz.Graph(format='png') |
|
for each_node in self.nodes: |
|
if 'ext_id' in self.node_attr(each_node): |
|
G.node(each_node, label='', |
|
shape='circle', width='0.1', height='0.1', style='filled', |
|
fillcolor='black') |
|
else: |
|
if with_node: |
|
G.node(each_node, label='', |
|
shape='circle', width='0.1', height='0.1', style='filled', |
|
fillcolor='gray') |
|
edge_list = [] |
|
for each_edge in self.edges: |
|
if self.edge_attr(each_edge).get('terminal', False): |
|
G.node(each_edge, |
|
label=self.edge_attr(each_edge)['symbol'].symbol if not with_edge_name \ |
|
else self.edge_attr(each_edge)['symbol'].symbol + ', ' + each_edge, |
|
fontcolor='black', shape='square') |
|
elif self.edge_attr(each_edge).get('tmp', False): |
|
G.node(each_edge, label='tmp' if not with_edge_name else 'tmp, ' + each_edge, |
|
fontcolor='black', shape='square') |
|
else: |
|
G.node(each_edge, |
|
label=self.edge_attr(each_edge)['symbol'].symbol if not with_edge_name \ |
|
else self.edge_attr(each_edge)['symbol'].symbol + ', ' + each_edge, |
|
fontcolor='black', shape='square', style='filled') |
|
if with_node: |
|
for each_node in self.nodes_in_edge(each_edge): |
|
G.edge(each_edge, each_node) |
|
else: |
|
for each_node in self.nodes_in_edge(each_edge): |
|
if 'ext_id' in self.node_attr(each_node)\ |
|
and set([each_node, each_edge]) not in edge_list: |
|
G.edge(each_edge, each_node) |
|
edge_list.append(set([each_node, each_edge])) |
|
for each_other_edge in self.adj_nodes(each_edge): |
|
if set([each_edge, each_other_edge]) not in edge_list: |
|
num_bond = 0 |
|
common_node_set = set(self.nodes_in_edge(each_edge))\ |
|
.intersection(set(self.nodes_in_edge(each_other_edge))) |
|
for each_node in common_node_set: |
|
if self.node_attr(each_node)['symbol'].bond_type in [1, 2, 3]: |
|
num_bond += self.node_attr(each_node)['symbol'].bond_type |
|
elif self.node_attr(each_node)['symbol'].bond_type in [12]: |
|
num_bond += 1 |
|
else: |
|
raise NotImplementedError('unsupported bond type') |
|
for _ in range(num_bond): |
|
G.edge(each_edge, each_other_edge) |
|
edge_list.append(set([each_edge, each_other_edge])) |
|
if file_path is not None: |
|
G.render(file_path, cleanup=True) |
|
|
|
return G |
|
|
|
def is_dividable(self, node): |
|
_hg = deepcopy(self.hg) |
|
_hg.remove_node(node) |
|
return (not nx.is_connected(_hg)) |
|
|
|
def divide(self, node): |
|
subhg_list = [] |
|
|
|
hg_wo_node = deepcopy(self) |
|
hg_wo_node.remove_node(node, remove_connected_edges=False) |
|
connected_components = nx.connected_components(hg_wo_node.hg) |
|
for each_component in connected_components: |
|
node_list = [node] |
|
edge_list = [] |
|
node_list.extend([each_node for each_node in each_component |
|
if each_node.startswith('bond_')]) |
|
edge_list.extend([each_edge for each_edge in each_component |
|
if each_edge.startswith('e')]) |
|
subhg_list.append(self.get_subhg(node_list, edge_list)) |
|
|
|
return subhg_list |
|
|
|
|