Spaces:
Runtime error
Runtime error
import math | |
from typing import List | |
from itertools import chain | |
import networkx as nx | |
import plotly.graph_objs as go | |
import numpy as np | |
def get_pipeline_graph(pipeline): | |
# Controls for how the graph is drawn | |
nodeColor = "#ffbf00" | |
nodeSize = 40 | |
lineWidth = 2 | |
lineColor = "#ffffff" | |
G = pipeline.graph | |
current_coordinate = (0, len(set([edge[0] for edge in G.edges()])) + 1) | |
# Transform G.edges into {node : all_connected_nodes} format | |
node_connections = {} | |
for in_node, out_node in G.edges(): | |
if in_node in node_connections: | |
node_connections[in_node].append(out_node) | |
else: | |
node_connections[in_node] = [out_node] | |
# Get node coordinates/pos | |
fixed_pos_nodes = {} | |
for idx, (in_node, out_nodes) in enumerate(node_connections.items()): | |
if in_node not in fixed_pos_nodes: | |
fixed_pos_nodes[in_node] = np.array([current_coordinate[0], current_coordinate[1]]) | |
current_coordinate = (current_coordinate[0], current_coordinate[1] - 1) | |
# If more than 1 out node, then branch out in X coordinate | |
if len(out_nodes) > 1: | |
# if length is odd | |
if (len(out_nodes) % 2) != 0: | |
middle_node = out_nodes[round(len(out_nodes)/2, 0) - 1] | |
fixed_pos_nodes[middle_node] = np.array([current_coordinate[0], current_coordinate[1]]) | |
out_nodes = [n for n in out_nodes if n != middle_node] | |
correction_coordinate = - len(out_nodes) / 2 | |
for out_node in out_nodes: | |
fixed_pos_nodes[out_node] = np.array([int(current_coordinate[0] + correction_coordinate), int(current_coordinate[1])]) | |
if correction_coordinate == -1: | |
correction_coordinate += 1 | |
correction_coordinate += 1 | |
current_coordinate = (current_coordinate[0], current_coordinate[1] - 1) | |
elif len(node_connections) - 1 == idx: | |
fixed_pos_nodes[out_nodes[0]] = np.array([current_coordinate[0], current_coordinate[1]]) | |
pos = nx.spring_layout(G, pos=fixed_pos_nodes, fixed=G.nodes(), seed=42) | |
for node in G.nodes: | |
G.nodes[node]["pos"] = list(pos[node]) | |
# Make list of nodes for plotly | |
node_x = [] | |
node_y = [] | |
node_name = [] | |
for node in G.nodes(): | |
node_name.append(G.nodes[node]["component"].name) | |
x, y = G.nodes[node]["pos"] | |
node_x.append(x) | |
node_y.append(y) | |
# Make a list of edges for plotly, including line segments that result in arrowheads | |
edge_x = [] | |
edge_y = [] | |
for edge in G.edges(): | |
start = G.nodes[edge[0]]["pos"] | |
end = G.nodes[edge[1]]["pos"] | |
# addEdge(start, end, edge_x, edge_y, lengthFrac=1, arrowPos = None, arrowLength=0.025, arrowAngle = 30, dotSize=20) | |
edge_x, edge_y = addEdge( | |
start, | |
end, | |
edge_x, | |
edge_y, | |
lengthFrac=0.5, | |
arrowPos="end", | |
arrowLength=0.04, | |
arrowAngle=40, | |
dotSize=nodeSize, | |
) | |
edge_trace = go.Scatter( | |
x=edge_x, | |
y=edge_y, | |
line=dict(width=lineWidth, color=lineColor), | |
hoverinfo="none", | |
mode="lines", | |
) | |
node_trace = go.Scatter( | |
x=node_x, | |
y=node_y, | |
mode="markers+text", | |
textposition="middle right", | |
hoverinfo="none", | |
text=node_name, | |
marker=dict(showscale=False, color=nodeColor, size=nodeSize), | |
textfont=dict(size=18), | |
) | |
fig = go.Figure( | |
data=[edge_trace, node_trace], | |
layout=go.Layout( | |
showlegend=False, | |
hovermode="closest", | |
margin=dict(b=20, l=5, r=5, t=40), | |
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
), | |
) | |
fig.update_layout( | |
yaxis=dict(scaleanchor="x", scaleratio=1), plot_bgcolor="rgb(14,17,23)" | |
) | |
return fig | |
def addEdge( | |
start, | |
end, | |
edge_x, | |
edge_y, | |
lengthFrac=1, | |
arrowPos=None, | |
arrowLength=0.025, | |
arrowAngle=30, | |
dotSize=20, | |
): | |
# Get start and end cartesian coordinates | |
x0, y0 = start | |
x1, y1 = end | |
# Incorporate the fraction of this segment covered by a dot into total reduction | |
length = math.sqrt((x1 - x0) ** 2 + (y1 - y0) ** 2) | |
dotSizeConversion = 0.0565 / 20 # length units per dot size | |
convertedDotDiameter = dotSize * dotSizeConversion | |
lengthFracReduction = convertedDotDiameter / length | |
lengthFrac = lengthFrac - lengthFracReduction | |
# If the line segment should not cover the entire distance, get actual start and end coords | |
skipX = (x1 - x0) * (1 - lengthFrac) | |
skipY = (y1 - y0) * (1 - lengthFrac) | |
x0 = x0 + skipX / 2 | |
x1 = x1 - skipX / 2 | |
y0 = y0 + skipY / 2 | |
y1 = y1 - skipY / 2 | |
# Append line corresponding to the edge | |
edge_x.append(x0) | |
edge_x.append(x1) | |
edge_x.append( | |
None | |
) # Prevents a line being drawn from end of this edge to start of next edge | |
edge_y.append(y0) | |
edge_y.append(y1) | |
edge_y.append(None) | |
# Draw arrow | |
if not arrowPos == None: | |
# Find the point of the arrow; assume is at end unless told middle | |
pointx = x1 | |
pointy = y1 | |
eta = math.degrees(math.atan((x1 - x0) / (y1 - y0))) if y1 != y0 else 90.0 | |
if arrowPos == "middle" or arrowPos == "mid": | |
pointx = x0 + (x1 - x0) / 2 | |
pointy = y0 + (y1 - y0) / 2 | |
# Find the directions the arrows are pointing | |
signx = (x1 - x0) / abs(x1 - x0) if x1 != x0 else +1 # verify this once | |
signy = (y1 - y0) / abs(y1 - y0) if y1 != y0 else +1 # verified | |
# Append first arrowhead | |
dx = arrowLength * math.sin(math.radians(eta + arrowAngle)) | |
dy = arrowLength * math.cos(math.radians(eta + arrowAngle)) | |
edge_x.append(pointx) | |
edge_x.append(pointx - signx**2 * signy * dx) | |
edge_x.append(None) | |
edge_y.append(pointy) | |
edge_y.append(pointy - signx**2 * signy * dy) | |
edge_y.append(None) | |
# And second arrowhead | |
dx = arrowLength * math.sin(math.radians(eta - arrowAngle)) | |
dy = arrowLength * math.cos(math.radians(eta - arrowAngle)) | |
edge_x.append(pointx) | |
edge_x.append(pointx - signx**2 * signy * dx) | |
edge_x.append(None) | |
edge_y.append(pointy) | |
edge_y.append(pointy - signx**2 * signy * dy) | |
edge_y.append(None) | |
return edge_x, edge_y | |
def add_arrows( | |
source_x: List[float], | |
target_x: List[float], | |
source_y: List[float], | |
target_y: List[float], | |
arrowLength=0.025, | |
arrowAngle=30, | |
): | |
pointx = list(map(lambda x: x[0] + (x[1] - x[0]) / 2, zip(source_x, target_x))) | |
pointy = list(map(lambda x: x[0] + (x[1] - x[0]) / 2, zip(source_y, target_y))) | |
etas = list( | |
map( | |
lambda x: math.degrees(math.atan((x[1] - x[0]) / (x[3] - x[2]))), | |
zip(source_x, target_x, source_y, target_y), | |
) | |
) | |
signx = list( | |
map(lambda x: (x[1] - x[0]) / abs(x[1] - x[0]), zip(source_x, target_x)) | |
) | |
signy = list( | |
map(lambda x: (x[1] - x[0]) / abs(x[1] - x[0]), zip(source_y, target_y)) | |
) | |
dx = list(map(lambda x: arrowLength * math.sin(math.radians(x + arrowAngle)), etas)) | |
dy = list(map(lambda x: arrowLength * math.cos(math.radians(x + arrowAngle)), etas)) | |
none_spacer = [None for _ in range(len(pointx))] | |
arrow_line_x = list( | |
map(lambda x: x[0] - x[1] ** 2 * x[2] * x[3], zip(pointx, signx, signy, dx)) | |
) | |
arrow_line_y = list( | |
map(lambda x: x[0] - x[1] ** 2 * x[2] * x[3], zip(pointy, signx, signy, dy)) | |
) | |
arrow_line_1x_coords = list(chain(*zip(pointx, arrow_line_x, none_spacer))) | |
arrow_line_1y_coords = list(chain(*zip(pointy, arrow_line_y, none_spacer))) | |
dx = list(map(lambda x: arrowLength * math.sin(math.radians(x - arrowAngle)), etas)) | |
dy = list(map(lambda x: arrowLength * math.cos(math.radians(x - arrowAngle)), etas)) | |
none_spacer = [None for _ in range(len(pointx))] | |
arrow_line_x = list( | |
map(lambda x: x[0] - x[1] ** 2 * x[2] * x[3], zip(pointx, signx, signy, dx)) | |
) | |
arrow_line_y = list( | |
map(lambda x: x[0] - x[1] ** 2 * x[2] * x[3], zip(pointy, signx, signy, dy)) | |
) | |
arrow_line_2x_coords = list(chain(*zip(pointx, arrow_line_x, none_spacer))) | |
arrow_line_2y_coords = list(chain(*zip(pointy, arrow_line_y, none_spacer))) | |
x_arrows = arrow_line_1x_coords + arrow_line_2x_coords | |
y_arrows = arrow_line_1y_coords + arrow_line_2y_coords | |
return x_arrows, y_arrows | |