import networkx as nx
import time
from concurrent import futures
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import networkx as nx
from networkx.generators.triads import triad_graph
import mFlow.Workflow.scheduler as scheduler
from mFlow.Workflow.compute_graph import pipelineNode
import os
[docs]class workflow():
[docs] def __init__(self, nodes={}):
'''
Workflow constructor
Args:
nodes (dict): a dictionary of workflow output nodes. Keys are used as tags
to identify outputs.
'''
self.graph=nx.DiGraph()
self.labels={}
self.out_nodes = []
self.pipeline_dict = {}
self.pipelineGraph = nx.DiGraph()
#If have compute nodes, add to graph
#along with all parents
for tag in nodes:
node = nodes[tag]
self.add_output(node, tag)
self.pipeline(self.graph)
[docs] def add_output(self, node, tag):
'''
Adds the given workflow node to the set of workflow outputs with the given tag,
then adds all of the node's ancestors to the workflow.
Args:
node: a workflow node
tag: a string to identifiy the node
'''
node.out_tag = tag
node.is_output=True
self.add(node)
[docs] def add(self, nodes):
'''
Adds the given list of workflow nodes to the workflow, along with all
of their ancestors. Sets the out_tag property of the nodes if not
already set.
Args:
nodes: a list of workflow nodes (or a single workflow node).
'''
if type(nodes) is not list:
nodes=[nodes]
self.out_nodes = self.out_nodes + nodes
for node in nodes:
if(not hasattr(node,"out_tag")):
node.out_tag = node.name
self.recursive_add_node(node)
[docs] def remove(self, nodes):
'''
Removes the given workflow nodes from the workflow, along with all of their
descendents.
Args:
nodes: a list of workflow nodes (or a single workflow node).
'''
if type(nodes) is not list:
nodes = [nodes]
for node in nodes:
node_id = str(id(node))
#print(" Removing: ", node.name, node_id)
if node_id in self.graph.nodes():
#Get descendents
descendants = nx.descendants(self.graph,source=node_id)
#Remove node from graph
self.graph.remove_node(node_id)
if(node in self.out_nodes):
self.out_nodes.remove(node)
#Remove any descendents from graph
for descendant_id in descendants:
this_node = self.graph.nodes[descendant_id]["block"]
print(" Removing: ", this_node.name, descendant_id)
self.graph.remove_node(descendant_id)
if(this_node in self.out_nodes):
self.out_nodes.remove(this_node)
[docs] def recursive_add_node(self, node):
'''
Adds the given list of workflow nodes to the workflow, along with all
of their ancestors. If the node is already in the workflow, does
not add duplicates.
Args:
nodes: a list of workflow nodes.
'''
#Get node name and ID
node_id = str(id(node))
# print(str(id(node)))
if node_id in self.graph.nodes(): #list(self.blocks.keys()):
#If node exists, do not add agan,
#just return node name
return(node_id)
else:
#If node does not exist, add to graph
self.add_node(node_id, node)
#Check for all nodes that are parents in args list
#and add, with edges to this node
for i in node.args_parents:
parent_id = self.recursive_add_node(node.args_parents[i])
self.graph.add_edge(parent_id, node_id)
#Check for all nodes that are parents in kwargs list
#and add, with edges to this node
for kw in node.kwargs_parents:
parent_id = self.recursive_add_node(node.kwargs_parents[kw])
self.graph.add_edge(parent_id, node_id)
return(node_id)
[docs] def add_node(self, name, block):
'''
Adds a node to the workflow with a specified name and block.
Args:
name (str): unique name for workflow node
block: the workflow block for this node
'''
#self.blocks[name] = block
self.graph.add_node(name, block=block, label=block.name, shape="box", style="filled", fillcolor="white",color="black", fontname="helvetica")
self.labels[name] = block.name
[docs] def add_edge(self, node_from, node_to):
'''
Add a directed edge between nodes
Args:
node_from: ID of from node
node_to: ID of to node
'''
self.graph.add_edge(node_from, node_to)
[docs] def set_status(self,node,status):
'''
Set the status of a workflow node
Args:
node: a workflow node
status (string): scheduled | notscheduled | running | done
'''
node["block"].status=status
if(status=="scheduled"):
node["fillcolor"]="lemonchiffon"
elif(status=="notscheduled"):
node["fillcolor"]="white"
elif(status=="running"):
node["fillcolor"]="palegreen3"
elif(status=="done"):
node["fillcolor"]="lightblue"
[docs] def draw(self, refresh=True):
'''
Display the workflow graph in a Jupyter notebook
Args:
refresh (bool): If True, clear the cell before drawing.
'''
import networkx as nx
from networkx.drawing.nx_agraph import write_dot, graphviz_layout
import matplotlib.pyplot as plt
from IPython.display import Image, display,clear_output
pdot = nx.drawing.nx_pydot.to_pydot(self.graph)
pdot.set_rankdir("LR")
if not os.path.exists("Temp"):
os.mkdir("Temp")
pdot.write_png("Temp/temp.png")
if(refresh):
clear_output(wait=True)
display(Image(filename='Temp/temp.png'))
[docs] def drawPipelined(self, refresh=True):
'''
Display pipilined workflow graph in a Jupyter notebook.
Args:
refresh (bool): If True, clear the cell before drawing.
'''
import networkx as nx
from networkx.drawing.nx_agraph import write_dot, graphviz_layout
import matplotlib.pyplot as plt
from IPython.display import Image, display,clear_output
pdot = nx.drawing.nx_pydot.to_pydot(self.pipelineGraph)
pdot.set_rankdir("LR")
pdot.write_png("Temp/temp.png")
if(refresh):
clear_output(wait=True)
display(Image(filename='Temp/temp.png'))
[docs] def run(self, backend="sequential", num_workers=1, monitor=False,from_scratch=False):
'''
Run the workflow with the specified backend scheduler.
Args:
backend (string): The type of scheduling backend to use (sequential | multithread | multiprocess | pipeline | multithread_pipeline | multiprocess_pipeline). See mFlow.Workflow.scheduler for documentation.
num_workers (int): Number of workers to use in parallel backends
monitor (bool): If True, use graphical execution monitorin for Jupyter notebooks
from_scratch (bool): If True, run the workflow from scratch, discarding any cached results.
'''
return scheduler.run(self, backend=backend, num_workers=num_workers, monitor=monitor,from_scratch=from_scratch)
[docs] def add_pipeline_node(self, id, plNode):
'''
Add a pipline node to pipeline workflow graph
Args:
id (string): id for the node
plNode: pipelined workflow node
'''
self.pipelineGraph.add_node(id, block=plNode, label=plNode.name, shape="box", style="filled", fillcolor="white",color="black", fontname="helvetica")
[docs] def pipelineGraphCreate(self, process_dict):
'''
Convert a dictionary of pipelined workflow nodes to a pipelined workflow graph.
Args:
process_dict: dictionary of pipelined workflow nodes.
'''
pipelineNodeList = []
for key in process_dict:
plNode = pipelineNode(self.graph, process_dict[key], key)
pipelineNodeList.append(plNode)
self.add_pipeline_node(key, plNode)
for plNode in pipelineNodeList:
for plNode_adj in pipelineNodeList:
# print(self.graph.node[plNode.tail]["block"])
# print(list(self.graph.node[plNode_adj.head]["block"].args_parents.values()))
# print("next")
if self.graph.nodes[plNode.tail]["block"] in self.graph.nodes[plNode_adj.head]["block"].args_parents.values():
# print("1")
self.pipelineGraph.add_edge(plNode.id, plNode_adj.id)
[docs] def pipeline(self, initGraph):
'''
Convert the given workflow graph to a pipelined representation where chanins in the workflow graph are replaced
by single node.
Args:
initGraph: A workflow graph.
'''
execute_order = list(nx.topological_sort(initGraph))
# print("execute order : ")
root_nodes = []
for idx, id in enumerate(execute_order):
if len(list(initGraph.predecessors(id))) == 0:
root_nodes.append(idx)
#print(initGraph.node[id]["block"].name + str(id))
node_lists = []
for idx in root_nodes:
node_lists.append(list(nx.dfs_preorder_nodes(initGraph, source=execute_order[idx])))
process_dict = {}
marked_nodes = []
key_num = 0
#print(node_lists)
for node_list in node_lists:
for id in node_list:
marked_nodes.append(id)
if len(list(initGraph.predecessors(id))) == 0:
# process_dict[key_num] = [initGraph.node[id]["block"].name]
process_dict[key_num] = [id]
if len(list(initGraph.successors(id))) == 1:
if list(initGraph.successors(id))[0] in marked_nodes:
key_num += 1
elif len(list(initGraph.successors(id))) == 0:
key_num += 1
elif len(list(initGraph.successors(id))) > 1:
key_num += 1
elif len(list(initGraph.predecessors(id))) == 1:
if len(list(initGraph.successors(id))) == 1:
if key_num not in process_dict.keys():
# process_dict[key_num] = [initGraph.node[id]["block"].name]
process_dict[key_num] = [id]
else:
# process_dict[key_num].append(initGraph.node[id]["block"].name)
process_dict[key_num].append(id)
if list(initGraph.successors(id))[0] in marked_nodes:
key_num += 1
elif len(list(initGraph.successors(id))) == 0:
if key_num not in process_dict.keys():
# process_dict[key_num] = [initGraph.node[id]["block"].name]
process_dict[key_num] = [id]
key_num += 1
else:
# process_dict[key_num].append(initGraph.node[id]["block"].name)
process_dict[key_num].append(id)
key_num += 1
elif len(list(initGraph.successors(id))) > 1:
if key_num not in process_dict.keys():
# process_dict[key_num] = [initGraph.node[id]["block"].name]
process_dict[key_num] = [id]
else:
# process_dict[key_num].append(initGraph.node[id]["block"].name)
process_dict[key_num].append(id)
key_num += 1
elif len(list(initGraph.predecessors(id))) > 1:
key_num += 1
if len(list(initGraph.successors(id))) == 1:
# process_dict[key_num] = [initGraph.node[id]["block"].name]
process_dict[key_num] = [id]
if list(initGraph.successors(id))[0] in marked_nodes:
key_num += 1
elif len(list(initGraph.successors(id))) == 0:
# process_dict[key_num] = [initGraph.node[id]["block"].name]
process_dict[key_num] = [id]
key_num += 1
elif len(list(initGraph.successors(id))) > 1:
# process_dict[key_num] = [initGraph.node[id]["block"].name]
process_dict[key_num] = [id]
key_num += 1
# print(process_dict)
self.pipelineGraphCreate(process_dict)