Source code for mFlow.Workflow.scheduler

import os
import networkx as nx
import time
from concurrent import futures
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
        

[docs]def run(flow, backend="sequential", num_workers=1, monitor=False, from_scratch=False): for id in flow.graph.nodes(): node=flow.graph.nodes[id] flow.set_status(node,"notscheduled") if(backend=="sequential"): return run_sequential(flow,monitor=monitor,from_scratch=from_scratch) elif(backend=="multithread" or backend=="multiprocess"): return run_parallel(flow, backend=backend, num_workers=num_workers,monitor=monitor,from_scratch=from_scratch) elif(backend == "pipeline"): return run_pipeline(flow, monitor=monitor,from_scratch=from_scratch) elif(backend=="multithread_pipeline" or backend=="multiprocess_pipeline"): return run_parallel_pipeline(flow, backend=backend, num_workers=num_workers,monitor=monitor,from_scratch=from_scratch)
[docs]def run_sequential(flow, data=None, monitor=False,from_scratch=False): import os if(monitor==False): print("Running Sequential Scheduler\n") else: from IPython import display display.clear_output() for id in flow.graph.nodes(): node=flow.graph.nodes[id] flow.set_status(node,"notscheduled") exectute_order = list(nx.topological_sort(flow.graph)) completed_nodes=[] for i,id in enumerate(exectute_order): if from_scratch or flow.graph.nodes[id]["block"].out is None: if(monitor==False): print("Running step %s"%flow.graph.nodes[id]["block"].name) flow.set_status(flow.graph.nodes[id], "running") if(monitor): flow.draw(refresh=True) flow.graph.nodes[id]["block"].run() flow.set_status(flow.graph.nodes[id], "done") if(monitor==False): print("") completed_nodes.append(id) for id in completed_nodes: this_node = flow.graph.nodes[id] if( (this_node["block"].out is not None) and (this_node["block"].is_output==False)): all_done=True for child_id in flow.graph.neighbors(id): child = flow.graph.nodes[child_id] if child["block"].status != "done": all_done=False if(all_done): this_node["block"].out = None this_node["fillcolor"]="grey" if(monitor): flow.draw() if(monitor==False): print("Workflow complete\n") return({n.out_tag: n.out for n in flow.out_nodes})
### Runs the pipelined graph in sequential order
[docs]def run_pipeline(flow, data=None, monitor=False,from_scratch=False): import os if(monitor==False): print("Running Sequential Scheduler\n") exectute_order = list(nx.topological_sort(flow.pipelineGraph)) for i,id in enumerate(exectute_order): if from_scratch or flow.pipelineGraph.nodes[id]["block"].out is None: if(monitor==False): print("Running step %s"%flow.pipelineGraph.nodes[id]["block"].name) flow.set_status(flow.pipelineGraph.nodes[id], "running") if(monitor): flow.drawPipelined(refresh=True) flow.pipelineGraph.nodes[id]["block"].run() flow.set_status(flow.pipelineGraph.nodes[id], "done") flow.set_status(flow.pipelineGraph.nodes[id], "done") if(monitor): flow.drawPipelined() if(monitor==False):print("Workflow complete\n") return({n.out_tag: n.out for n in flow.out_nodes})
[docs]def run_parallel(flow,data=None,backend="multithread",num_workers=1,monitor=False,from_scratch=False): if(monitor==False): print("Running Parallel Scheduler\n") for id in flow.pipelineGraph.nodes(): node=flow.pipelineGraph.nodes[id] flow.set_status(node,"notscheduled") nodes_waiting = list(flow.graph.nodes) nodes_done = [] nodes_scheduled = [] nodes_running = [] active_futures = [] no_parents = True flow.parents = {} par_out = {} times = {} if(monitor): flow.draw(refresh=True) if(backend=="multithread"): ex = futures.ThreadPoolExecutor(max_workers=num_workers) elif(backend=="multiprocess"): ex = futures.ProcessPoolExecutor(max_workers=num_workers) else: raise ValueError("Backend type is not known") done=False while not all(flow.graph.nodes[id]["block"].status=="done" for id in flow.graph.nodes): #Scheduling loop for id in list(flow.graph.nodes): this_block = flow.graph.nodes[id]["block"] this_node = flow.graph.nodes[id] updated = False #Node has no future set yet, see if it can run if(this_block.future is None): parents = list(flow.graph.predecessors(id)) if(len(parents)==0 or all(flow.graph.nodes[p]["block"].status=="done" for p in parents)): updated=True if from_scratch or this_block.out is None: #Submit the function to run args = this_block.get_args() kwargs = this_block.get_kwargs() this_block.out=None this_block.future = ex.submit(this_block.function, *args, **kwargs) else: #Already have cached outout, submit a dummy function this_block.future = ex.submit(lambda: 0) flow.set_status(this_node, "scheduled") #Show new scheduled nodes if(monitor and updated): flow.draw(refresh=True) #Wait until first scheduled job finishes or timeout, then process nodes active_nodes = [ id for id in flow.graph.nodes if flow.graph.nodes[id]["block"].status=="scheduled" ] active_nodes = active_nodes+[ id for id in flow.graph.nodes if flow.graph.nodes[id]["block"].status=="running" ] active_futures = [flow.graph.nodes[id]["block"].future for id in active_nodes] running_nodes = [[flow.graph.nodes[id]["block"].name,flow.graph.nodes[id]["block"].future.__dict__] for id in active_nodes] #print(running_nodes) #print(ex.__dict__) if(len(active_futures )>0): futures.wait(active_futures, timeout=1, return_when=futures.FIRST_COMPLETED) else: time.sleep(0.5) #Node status update loop for id in list(flow.graph.nodes): this_block = flow.graph.nodes[id]["block"] this_node = flow.graph.nodes[id] if(this_block.future is not None): #Future is in running state but node state is not running if this_block.future.running() and this_block.status!="running": updated=True flow.set_status(this_node, "running") #Future is in done state but node state is not done elif this_block.future.done() and this_block.status!="done": updated=True flow.set_status(this_node, "done") if this_block.out is None: this_block.out=this_block.future.result() else: res=this_block.future.result() #If state is not done or running, error elif (not this_block.future.done()) and (not this_block.future.running()) and (not this_block.future._state=="PENDING"): print(this_block.future._state) raise ValueError("Future state is not running, done or pending") #Future is in done state and node state is done and output is still set and not a flow output ''' elif this_block.future.done() and (this_block.status=="done") and (this_block.out is not None) and (this_block.is_output==False): all_done=True for child_id in flow.graph.neighbors(id): child = flow.graph.nodes[child_id] if child["block"].status != "done": all_done=False if(all_done): updated=True this_block.out = None this_node["fillcolor"]="grey" ''' if(monitor and updated): flow.draw(refresh=True) print('.',end="",flush=True) if(monitor): flow.draw(refresh=False) else: print("Workflow complete\n") ex.shutdown(wait=True) return({n.out_tag: n.out for n in flow.out_nodes})
''' def run_parallel(flow,data=None,backend="multithread",num_workers=1,monitor=False,from_scratch=False): if(monitor==False): print("Running Parallel Scheduler\n") for id in flow.pipelineGraph.nodes(): node=flow.pipelineGraph.nodes[id] flow.set_status(node,"notscheduled") nodes_waiting = list(flow.graph.nodes) nodes_done = [] nodes_scheduled = [] nodes_running = [] active_futures = [] no_parents = True flow.parents = {} par_out = {} times = {} if(monitor): flow.draw(refresh=True) if(backend=="multithread"): executor = futures.ThreadPoolExecutor(max_workers=num_workers) elif(backend=="multiprocess"): executor = futures.ProcessPoolExecutor(max_workers=num_workers) else: raise with executor as ex: completed_nodes=[] while len(nodes_waiting)+len(nodes_scheduled)+len(nodes_running)>0: #Schedule nodes whose parents have all completed changed=False for node in nodes_waiting: parents = list(flow.graph.predecessors(node)) if(len(parents)==0 or all(x in nodes_done for x in parents)): changed=True nodes_waiting.remove(node) if from_scratch or flow.graph.nodes[node]["block"].out is None: nodes_scheduled.append(node) args = flow.graph.nodes[node]["block"].get_args() kwargs = flow.graph.nodes[node]["block"].get_kwargs() f = ex.submit(flow.graph.nodes[node]["block"].function, *args, **kwargs) par_out[node]=f active_futures.append(f) flow.set_status(flow.graph.nodes[node], "scheduled") if(monitor==False): print("Scheduled:", flow.graph.nodes[node]["block"].name) else: nodes_done.append(node) flow.set_status(flow.graph.nodes[node], "done") if(monitor==False): print("Done:", flow.graph.nodes[node]["block"].name) if(monitor and changed): flow.draw(refresh=True) #Wait until first scheduled job finishes, then process nodes #res = futures.wait(active_futures, timeout=0.4, return_when=futures.FIRST_COMPLETED) #active_futures = list(res.not_done) #Check for nodes moved to running state changed=False for node in nodes_scheduled: if par_out[node].running() or par_out[node].done(): changed=True nodes_running = nodes_running + [node] nodes_scheduled.remove(node) flow.set_status(flow.graph.nodes[node], "running") if(monitor==False): print("Running:", flow.graph.nodes[node]["block"].name) if(monitor and changed): flow.draw(refresh=True) #Check for nodes that are done changed=False for node in nodes_running: if par_out[node].done(): changed=True flow.graph.nodes[node]["block"].out=par_out[node].result() nodes_done = nodes_done + [node] nodes_running.remove(node) flow.set_status(flow.graph.nodes[node], "done") completed_nodes.append(node) if(monitor==False): print("Done:", flow.graph.nodes[node]["block"].name) if(monitor and changed): flow.draw(refresh=True) #Remove result output from nodes that are not workflow outputs #and whose children have all completed running. if(changed): changed = False for id in completed_nodes: this_node = flow.graph.nodes[id] if( (this_node["block"].out is not None) and (this_node["block"].is_output==False)): all_done=True for child_id in flow.graph.neighbors(id): child = flow.graph.nodes[child_id] if child["block"].status != "done": all_done=False if(all_done): this_node["block"].out = None this_node["fillcolor"]="grey" changed=True if(monitor and changed): flow.draw(refresh=True) time.sleep(0.5) print('.',end="",flush=True) for node in list(set(list(flow.graph.nodes)) - set(nodes_done)): flow.set_status(flow.graph.nodes[node], "done") flow.graph.nodes[node]["block"].out=par_out[node].result() if(monitor): flow.draw(refresh=False) else: print("Workflow complete\n") return({n.out_tag: n.out for n in flow.out_nodes}) ''' ### Runs the pipelined graph in parallel, either in multithreaded or multiprocessed configurations depending on the flag
[docs]def run_parallel_pipeline(flow,data=None,backend="multithread_pipeline",num_workers=1,monitor=False,from_scratch=False, refresh_rate=0.05): if(monitor==False): print("Running Parallel Pipeline Scheduler\n") for id in flow.pipelineGraph.nodes(): node=flow.pipelineGraph.nodes[id] flow.set_status(node,"notscheduled") nodes_waiting = list(flow.pipelineGraph.nodes) nodes_done = [] nodes_scheduled = [] nodes_running = [] no_parents = True flow.parents = {} par_out = {} times = {} if(monitor): flow.drawPipelined(refresh=True) if(backend=="multithread_pipeline"): executor = futures.ThreadPoolExecutor(max_workers=num_workers) elif(backend=="multiprocess_pipeline"): executor = futures.ProcessPoolExecutor(max_workers=num_workers) else: raise time_start = time.time() with executor as ex: while len(nodes_waiting)+len(nodes_scheduled)+len(nodes_running)>0: changed=False for node in nodes_waiting[:]: parents = list(flow.pipelineGraph.predecessors(node)) if(len(parents)==0 or all(x in nodes_done for x in parents)): changed=True nodes_waiting.remove(node) if from_scratch or flow.pipelineGraph.nodes[node]["block"].out is None: nodes_scheduled.append(node) # args = flow.pipelineGraph.nodes[node]["block"].get_args() # kwargs = flow.pipelineGraph.nodes[node]["block"].get_kwargs() # print("here") f = ex.submit(flow.pipelineGraph.nodes[node]["block"].run) par_out[node]=f flow.set_status(flow.pipelineGraph.nodes[node], "scheduled") if(monitor==False): print("Scheduled:", flow.pipelineGraph.nodes[node]["block"].name) else: nodes_done.append(node) flow.set_status(flow.pipelineGraph.nodes[node], "done") if(monitor==False): print("Done:", flow.pipelineGraph.nodes[node]["block"].name) for node in nodes_scheduled[:]: if par_out[node].running() or par_out[node].done(): changed=True nodes_running = nodes_running + [node] nodes_scheduled.remove(node) flow.set_status(flow.pipelineGraph.nodes[node], "running") #if(monitor==False): print("Running:", flow.pipelineGraph.nodes[node]["block"].name) for node in nodes_running[:]: if par_out[node].done(): changed=True if(monitor==False and par_out[node].exception() is not None): print(par_out[node].exception()) flow.pipelineGraph.nodes[node]["block"].out=par_out[node].result() flow.graph.nodes[flow.pipelineGraph.nodes[node]["block"].tail]["block"].out = par_out[node].result() nodes_done = nodes_done + [node] nodes_running.remove(node) flow.set_status(flow.pipelineGraph.nodes[node], "done") if(monitor==False): print("Done:", flow.pipelineGraph.nodes[node]["block"].name) if(monitor and changed and time.time()-time_start>refresh_rate): flow.drawPipelined(refresh=True) time_start = time.time() time.sleep(0.5) print('.',end="",flush=True) for node in list(set(list(flow.pipelineGraph.nodes)) - set(nodes_done)): flow.set_status(flow.graph.nodes[node], "done") flow.graph.nodes[node]["block"].out=par_out[node].result() if(monitor): flow.drawPipelined(refresh=False) if(monitor==False):print("Workflow complete\n") return({n.out_tag: n.out for n in flow.out_nodes})