import math from typing import List from itertools import chain import networkx as nx import plotly.graph_objs as go import numpy as np def get_pipeline_graph(pipeline): # Controls for how the graph is drawn nodeColor = "#ffbf00" nodeSize = 40 lineWidth = 2 lineColor = "#ffffff" G = pipeline.graph current_coordinate = (0, len(set([edge[0] for edge in G.edges()])) + 1) # Transform G.edges into {node : all_connected_nodes} format node_connections = {} for in_node, out_node in G.edges(): if in_node in node_connections: node_connections[in_node].append(out_node) else: node_connections[in_node] = [out_node] # Get node coordinates/pos fixed_pos_nodes = {} for idx, (in_node, out_nodes) in enumerate(node_connections.items()): if in_node not in fixed_pos_nodes: fixed_pos_nodes[in_node] = np.array([current_coordinate[0], current_coordinate[1]]) current_coordinate = (current_coordinate[0], current_coordinate[1] - 1) # If more than 1 out node, then branch out in X coordinate if len(out_nodes) > 1: # if length is odd if (len(out_nodes) % 2) != 0: middle_node = out_nodes[round(len(out_nodes)/2, 0) - 1] fixed_pos_nodes[middle_node] = np.array([current_coordinate[0], current_coordinate[1]]) out_nodes = [n for n in out_nodes if n != middle_node] correction_coordinate = - len(out_nodes) / 2 for out_node in out_nodes: fixed_pos_nodes[out_node] = np.array([int(current_coordinate[0] + correction_coordinate), int(current_coordinate[1])]) if correction_coordinate == -1: correction_coordinate += 1 correction_coordinate += 1 current_coordinate = (current_coordinate[0], current_coordinate[1] - 1) elif len(node_connections) - 1 == idx: fixed_pos_nodes[out_nodes[0]] = np.array([current_coordinate[0], current_coordinate[1]]) pos = nx.spring_layout(G, pos=fixed_pos_nodes, fixed=G.nodes(), seed=42) for node in G.nodes: G.nodes[node]["pos"] = list(pos[node]) # Make list of nodes for plotly node_x = [] node_y = [] node_name = [] for node in G.nodes(): node_name.append(G.nodes[node]["component"].name) x, y = G.nodes[node]["pos"] node_x.append(x) node_y.append(y) # Make a list of edges for plotly, including line segments that result in arrowheads edge_x = [] edge_y = [] for edge in G.edges(): start = G.nodes[edge[0]]["pos"] end = G.nodes[edge[1]]["pos"] # addEdge(start, end, edge_x, edge_y, lengthFrac=1, arrowPos = None, arrowLength=0.025, arrowAngle = 30, dotSize=20) edge_x, edge_y = addEdge( start, end, edge_x, edge_y, lengthFrac=0.5, arrowPos="end", arrowLength=0.04, arrowAngle=40, dotSize=nodeSize, ) edge_trace = go.Scatter( x=edge_x, y=edge_y, line=dict(width=lineWidth, color=lineColor), hoverinfo="none", mode="lines", ) node_trace = go.Scatter( x=node_x, y=node_y, mode="markers+text", textposition="middle right", hoverinfo="none", text=node_name, marker=dict(showscale=False, color=nodeColor, size=nodeSize), textfont=dict(size=18), ) fig = go.Figure( data=[edge_trace, node_trace], layout=go.Layout( showlegend=False, hovermode="closest", margin=dict(b=20, l=5, r=5, t=40), xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), ), ) fig.update_layout( yaxis=dict(scaleanchor="x", scaleratio=1), plot_bgcolor="rgb(14,17,23)" ) return fig def addEdge( start, end, edge_x, edge_y, lengthFrac=1, arrowPos=None, arrowLength=0.025, arrowAngle=30, dotSize=20, ): # Get start and end cartesian coordinates x0, y0 = start x1, y1 = end # Incorporate the fraction of this segment covered by a dot into total reduction length = math.sqrt((x1 - x0) ** 2 + (y1 - y0) ** 2) dotSizeConversion = 0.0565 / 20 # length units per dot size convertedDotDiameter = dotSize * dotSizeConversion lengthFracReduction = convertedDotDiameter / length lengthFrac = lengthFrac - lengthFracReduction # If the line segment should not cover the entire distance, get actual start and end coords skipX = (x1 - x0) * (1 - lengthFrac) skipY = (y1 - y0) * (1 - lengthFrac) x0 = x0 + skipX / 2 x1 = x1 - skipX / 2 y0 = y0 + skipY / 2 y1 = y1 - skipY / 2 # Append line corresponding to the edge edge_x.append(x0) edge_x.append(x1) edge_x.append( None ) # Prevents a line being drawn from end of this edge to start of next edge edge_y.append(y0) edge_y.append(y1) edge_y.append(None) # Draw arrow if not arrowPos == None: # Find the point of the arrow; assume is at end unless told middle pointx = x1 pointy = y1 eta = math.degrees(math.atan((x1 - x0) / (y1 - y0))) if y1 != y0 else 90.0 if arrowPos == "middle" or arrowPos == "mid": pointx = x0 + (x1 - x0) / 2 pointy = y0 + (y1 - y0) / 2 # Find the directions the arrows are pointing signx = (x1 - x0) / abs(x1 - x0) if x1 != x0 else +1 # verify this once signy = (y1 - y0) / abs(y1 - y0) if y1 != y0 else +1 # verified # Append first arrowhead dx = arrowLength * math.sin(math.radians(eta + arrowAngle)) dy = arrowLength * math.cos(math.radians(eta + arrowAngle)) edge_x.append(pointx) edge_x.append(pointx - signx**2 * signy * dx) edge_x.append(None) edge_y.append(pointy) edge_y.append(pointy - signx**2 * signy * dy) edge_y.append(None) # And second arrowhead dx = arrowLength * math.sin(math.radians(eta - arrowAngle)) dy = arrowLength * math.cos(math.radians(eta - arrowAngle)) edge_x.append(pointx) edge_x.append(pointx - signx**2 * signy * dx) edge_x.append(None) edge_y.append(pointy) edge_y.append(pointy - signx**2 * signy * dy) edge_y.append(None) return edge_x, edge_y def add_arrows( source_x: List[float], target_x: List[float], source_y: List[float], target_y: List[float], arrowLength=0.025, arrowAngle=30, ): pointx = list(map(lambda x: x[0] + (x[1] - x[0]) / 2, zip(source_x, target_x))) pointy = list(map(lambda x: x[0] + (x[1] - x[0]) / 2, zip(source_y, target_y))) etas = list( map( lambda x: math.degrees(math.atan((x[1] - x[0]) / (x[3] - x[2]))), zip(source_x, target_x, source_y, target_y), ) ) signx = list( map(lambda x: (x[1] - x[0]) / abs(x[1] - x[0]), zip(source_x, target_x)) ) signy = list( map(lambda x: (x[1] - x[0]) / abs(x[1] - x[0]), zip(source_y, target_y)) ) dx = list(map(lambda x: arrowLength * math.sin(math.radians(x + arrowAngle)), etas)) dy = list(map(lambda x: arrowLength * math.cos(math.radians(x + arrowAngle)), etas)) none_spacer = [None for _ in range(len(pointx))] arrow_line_x = list( map(lambda x: x[0] - x[1] ** 2 * x[2] * x[3], zip(pointx, signx, signy, dx)) ) arrow_line_y = list( map(lambda x: x[0] - x[1] ** 2 * x[2] * x[3], zip(pointy, signx, signy, dy)) ) arrow_line_1x_coords = list(chain(*zip(pointx, arrow_line_x, none_spacer))) arrow_line_1y_coords = list(chain(*zip(pointy, arrow_line_y, none_spacer))) dx = list(map(lambda x: arrowLength * math.sin(math.radians(x - arrowAngle)), etas)) dy = list(map(lambda x: arrowLength * math.cos(math.radians(x - arrowAngle)), etas)) none_spacer = [None for _ in range(len(pointx))] arrow_line_x = list( map(lambda x: x[0] - x[1] ** 2 * x[2] * x[3], zip(pointx, signx, signy, dx)) ) arrow_line_y = list( map(lambda x: x[0] - x[1] ** 2 * x[2] * x[3], zip(pointy, signx, signy, dy)) ) arrow_line_2x_coords = list(chain(*zip(pointx, arrow_line_x, none_spacer))) arrow_line_2y_coords = list(chain(*zip(pointy, arrow_line_y, none_spacer))) x_arrows = arrow_line_1x_coords + arrow_line_2x_coords y_arrows = arrow_line_1y_coords + arrow_line_2y_coords return x_arrows, y_arrows