mFlow Overview¶
mFlow is a Python module for specifying and executing machine learning experimentation workflows. It organizes both data transformations and common experimental procedures into workflow blocks. mFlow focuses on the multi-core parallel setting and interoperates with machine learning model implementations that follow the scikit-learn model class structure. mFlow can also interoperate with Apache Spark and the MD2K Cerebral Cortex library to split workflow execution across a distributed pre-processing and feature extraction phase followed by a multi-core parallel model estimation phase. mFlow uses Pandas dataframes as the primary data structure for representing data sets and results.
mFlow supports a range of experimental workflows with a focus on multi-level data where individual data cases are nested within groups. It supports partitioning data sets both at the instance level and at the group level for most workflows. Experimental designs currently supported by mFlow include:
Train/Test performance assessment with instance-level partitioning
Train/test performance assessment with group-level partitioning
Cross-validation performance assessment with instance-level partitioning
Cross-validation performance assessment with group-level partitioning
Within group Train/Test performance assessment with random partitioning within groups
Within group Train/Test performance assessment with sequential partitioning within groups
mFlow also includes a library of example workflows illustrating the application of data transformations and experimental workflows on open data sets in the mobile health and activity recognition domains.
Installation¶
mFlow requires Python 3.6 or higher. To install mFlow using pip, clone this repository and run pip install:
git clone https://github.com/mlds-lab/mFlow.git
pip3 install ./mFlow
Examples¶
See the Examples directory for a list of mFlow examples that can be run locally or launched in Google Colab.
mFlow.Blocks Package¶
mFlow.Blocks.ccwrapper Module¶
mFlow.Blocks.data_loader_extrasensory Module¶
mFlow.Blocks.data_loader_wesad Module¶
mFlow.Blocks.experimental_protocol Module¶
mFlow.Blocks.filter Module¶
mFlow.Utilities Package¶
mFlow.Workflow Package¶
mFlow.Workflow.compute_graph Module¶
mFlow.Workflow.scheduler Module¶
- mFlow.Workflow.scheduler.run(flow, backend='sequential', num_workers=1, monitor=False, from_scratch=False)[source]¶
- mFlow.Workflow.scheduler.run_parallel(flow, data=None, backend='multithread', num_workers=1, monitor=False, from_scratch=False)[source]¶
mFlow.Workflow.workflow Module¶
- class mFlow.Workflow.workflow.workflow(nodes={})[source]¶
Bases:
object
- __init__(nodes={})[source]¶
Workflow constructor
- Parameters
nodes (dict) – a dictionary of workflow output nodes. Keys are used as tags
outputs. (to identify) –
- add(nodes)[source]¶
Adds the given list of workflow nodes to the workflow, along with all of their ancestors. Sets the out_tag property of the nodes if not already set.
- Parameters
nodes – a list of workflow nodes (or a single workflow node).
- add_edge(node_from, node_to)[source]¶
Add a directed edge between nodes
- Parameters
node_from – ID of from node
node_to – ID of to node
- add_node(name, block)[source]¶
Adds a node to the workflow with a specified name and block.
- Parameters
name (str) – unique name for workflow node
block – the workflow block for this node
- add_output(node, tag)[source]¶
Adds the given workflow node to the set of workflow outputs with the given tag, then adds all of the node’s ancestors to the workflow.
- Parameters
node – a workflow node
tag – a string to identifiy the node
- add_pipeline_node(id, plNode)[source]¶
Add a pipline node to pipeline workflow graph
- Parameters
id (string) – id for the node
plNode – pipelined workflow node
- draw(refresh=True)[source]¶
Display the workflow graph in a Jupyter notebook
- Parameters
refresh (bool) – If True, clear the cell before drawing.
- drawPipelined(refresh=True)[source]¶
Display pipilined workflow graph in a Jupyter notebook.
- Parameters
refresh (bool) – If True, clear the cell before drawing.
- pipeline(initGraph)[source]¶
Convert the given workflow graph to a pipelined representation where chanins in the workflow graph are replaced by single node.
- Parameters
initGraph – A workflow graph.
- pipelineGraphCreate(process_dict)[source]¶
Convert a dictionary of pipelined workflow nodes to a pipelined workflow graph.
- Parameters
process_dict – dictionary of pipelined workflow nodes.
- recursive_add_node(node)[source]¶
Adds the given list of workflow nodes to the workflow, along with all of their ancestors. If the node is already in the workflow, does not add duplicates.
- Parameters
nodes – a list of workflow nodes.
- remove(nodes)[source]¶
Removes the given workflow nodes from the workflow, along with all of their descendents.
- Parameters
nodes – a list of workflow nodes (or a single workflow node).
- run(backend='sequential', num_workers=1, monitor=False, from_scratch=False)[source]¶
Run the workflow with the specified backend scheduler.
- Parameters
backend (string) – The type of scheduling backend to use (sequential | multithread | multiprocess | pipeline | multithread_pipeline | multiprocess_pipeline). See mFlow.Workflow.scheduler for documentation.
num_workers (int) – Number of workers to use in parallel backends
monitor (bool) – If True, use graphical execution monitorin for Jupyter notebooks
from_scratch (bool) – If True, run the workflow from scratch, discarding any cached results.
Contributors¶
Link to the list of contributors who participated in this project.
Acknowledgments¶
National Institutes of Health - Big Data to Knowledge Initiative Grant: 1U54EB020404
National Science Foundation Grant: 1823283