mFlow.Workflow Package

mFlow.Workflow.compute_graph Module

class mFlow.Workflow.compute_graph.node(function=None, args=[], kwargs={}, name=None, parents=[])[source]

Bases: object

__init__(function=None, args=[], kwargs={}, name=None, parents=[])[source]
get_args()[source]
get_kwargs()[source]
run()[source]
class mFlow.Workflow.compute_graph.pipelineNode(initGraph, node_list, id)[source]

Bases: object

__init__(initGraph, node_list, id)[source]
run()[source]

mFlow.Workflow.scheduler Module

mFlow.Workflow.scheduler.run(flow, backend='sequential', num_workers=1, monitor=False, from_scratch=False)[source]
mFlow.Workflow.scheduler.run_parallel(flow, data=None, backend='multithread', num_workers=1, monitor=False, from_scratch=False)[source]
mFlow.Workflow.scheduler.run_parallel_pipeline(flow, data=None, backend='multithread_pipeline', num_workers=1, monitor=False, from_scratch=False, refresh_rate=0.05)[source]
mFlow.Workflow.scheduler.run_pipeline(flow, data=None, monitor=False, from_scratch=False)[source]
mFlow.Workflow.scheduler.run_sequential(flow, data=None, monitor=False, from_scratch=False)[source]

mFlow.Workflow.workflow Module

class mFlow.Workflow.workflow.workflow(nodes={})[source]

Bases: object

__init__(nodes={})[source]

Workflow constructor

Parameters
  • nodes (dict) – a dictionary of workflow output nodes. Keys are used as tags

  • outputs. (to identify) –

add(nodes)[source]

Adds the given list of workflow nodes to the workflow, along with all of their ancestors. Sets the out_tag property of the nodes if not already set.

Parameters

nodes – a list of workflow nodes (or a single workflow node).

add_edge(node_from, node_to)[source]

Add a directed edge between nodes

Parameters
  • node_from – ID of from node

  • node_to – ID of to node

add_node(name, block)[source]

Adds a node to the workflow with a specified name and block.

Parameters
  • name (str) – unique name for workflow node

  • block – the workflow block for this node

add_output(node, tag)[source]

Adds the given workflow node to the set of workflow outputs with the given tag, then adds all of the node’s ancestors to the workflow.

Parameters
  • node – a workflow node

  • tag – a string to identifiy the node

add_pipeline_node(id, plNode)[source]

Add a pipline node to pipeline workflow graph

Parameters
  • id (string) – id for the node

  • plNode – pipelined workflow node

draw(refresh=True)[source]

Display the workflow graph in a Jupyter notebook

Parameters

refresh (bool) – If True, clear the cell before drawing.

drawPipelined(refresh=True)[source]

Display pipilined workflow graph in a Jupyter notebook.

Parameters

refresh (bool) – If True, clear the cell before drawing.

pipeline(initGraph)[source]

Convert the given workflow graph to a pipelined representation where chanins in the workflow graph are replaced by single node.

Parameters

initGraph – A workflow graph.

pipelineGraphCreate(process_dict)[source]

Convert a dictionary of pipelined workflow nodes to a pipelined workflow graph.

Parameters

process_dict – dictionary of pipelined workflow nodes.

recursive_add_node(node)[source]

Adds the given list of workflow nodes to the workflow, along with all of their ancestors. If the node is already in the workflow, does not add duplicates.

Parameters

nodes – a list of workflow nodes.

remove(nodes)[source]

Removes the given workflow nodes from the workflow, along with all of their descendents.

Parameters

nodes – a list of workflow nodes (or a single workflow node).

run(backend='sequential', num_workers=1, monitor=False, from_scratch=False)[source]

Run the workflow with the specified backend scheduler.

Parameters
  • backend (string) – The type of scheduling backend to use (sequential | multithread | multiprocess | pipeline | multithread_pipeline | multiprocess_pipeline). See mFlow.Workflow.scheduler for documentation.

  • num_workers (int) – Number of workers to use in parallel backends

  • monitor (bool) – If True, use graphical execution monitorin for Jupyter notebooks

  • from_scratch (bool) – If True, run the workflow from scratch, discarding any cached results.

set_status(node, status)[source]

Set the status of a workflow node

Parameters
  • node – a workflow node

  • status (string) – scheduled | notscheduled | running | done