mFlow Overview

mFlow is a Python module for specifying and executing machine learning experimentation workflows. It organizes both data transformations and common experimental procedures into workflow blocks. mFlow focuses on the multi-core parallel setting and interoperates with machine learning model implementations that follow the scikit-learn model class structure. mFlow can also interoperate with Apache Spark and the MD2K Cerebral Cortex library to split workflow execution across a distributed pre-processing and feature extraction phase followed by a multi-core parallel model estimation phase. mFlow uses Pandas dataframes as the primary data structure for representing data sets and results.

mFlow supports a range of experimental workflows with a focus on multi-level data where individual data cases are nested within groups. It supports partitioning data sets both at the instance level and at the group level for most workflows. Experimental designs currently supported by mFlow include:

  • Train/Test performance assessment with instance-level partitioning

  • Train/test performance assessment with group-level partitioning

  • Cross-validation performance assessment with instance-level partitioning

  • Cross-validation performance assessment with group-level partitioning

  • Within group Train/Test performance assessment with random partitioning within groups

  • Within group Train/Test performance assessment with sequential partitioning within groups

mFlow also includes a library of example workflows illustrating the application of data transformations and experimental workflows on open data sets in the mobile health and activity recognition domains.

Installation

mFlow requires Python 3.6 or higher. To install mFlow using pip, clone this repository and run pip install:

git clone https://github.com/mlds-lab/mFlow.git
pip3 install ./mFlow

Examples

See the Examples directory for a list of mFlow examples that can be run locally or launched in Google Colab.

mFlow.Blocks Package

mFlow.Blocks.ccwrapper Module

mFlow.Blocks.ccwrapper.cc_to_pandas(*args, **kwargs)[source]
mFlow.Blocks.ccwrapper.ccwrapper(*args, **kwargs)[source]

mFlow.Blocks.data_loader_extrasensory Module

mFlow.Blocks.data_loader_extrasensory.extrasensory_data_loader(**kwargs)[source]

mFlow.Blocks.data_loader_wesad Module

mFlow.Blocks.data_loader_wesad.wesad_data_loader(**kwargs)[source]

mFlow.Blocks.experimental_protocol Module

mFlow.Blocks.experimental_protocol.ExpCV(*args, **kwargs)[source]
mFlow.Blocks.experimental_protocol.ExpTrainTest(*args, **kwargs)[source]
mFlow.Blocks.experimental_protocol.ExpWithin(*args, **kwargs)[source]
mFlow.Blocks.experimental_protocol.addTarget(*args, **kwargs)[source]
mFlow.Blocks.experimental_protocol.df_to_sk(df)[source]

mFlow.Blocks.filter Module

mFlow.Blocks.filter.ColumnSelectFilter(*args, **kwargs)[source]
mFlow.Blocks.filter.MisingDataColumnFilter(*args, **kwargs)[source]
mFlow.Blocks.filter.MisingDataRowFilter(*args, **kwargs)[source]
mFlow.Blocks.filter.MisingLabelFilter(*args, **kwargs)[source]
mFlow.Blocks.filter.Take(*args, **kwargs)[source]

mFlow.Blocks.imputer Module

mFlow.Blocks.imputer.Imputer(*args, **kwargs)[source]

mFlow.Blocks.normalizer Module

mFlow.Blocks.normalizer.Normalizer(*args, **kwargs)[source]

mFlow.Blocks.results_analysis Module

mFlow.Blocks.results_analysis.DataYieldReport(*args, **kwargs)[source]
mFlow.Blocks.results_analysis.ResultsCVSummarize(*args, **kwargs)[source]
mFlow.Blocks.results_analysis.ResultsConcat(*args, **kwargs)[source]

mFlow.Utilities Package

mFlow.Utilities.utilities Module

mFlow.Utilities.utilities.getCacheDir()[source]
mFlow.Utilities.utilities.getDataDir()[source]
mFlow.Utilities.utilities.getmFlowUserHome()[source]

mFlow.Workflow Package

mFlow.Workflow.compute_graph Module

class mFlow.Workflow.compute_graph.node(function=None, args=[], kwargs={}, name=None, parents=[])[source]

Bases: object

__init__(function=None, args=[], kwargs={}, name=None, parents=[])[source]
get_args()[source]
get_kwargs()[source]
run()[source]
class mFlow.Workflow.compute_graph.pipelineNode(initGraph, node_list, id)[source]

Bases: object

__init__(initGraph, node_list, id)[source]
run()[source]

mFlow.Workflow.scheduler Module

mFlow.Workflow.scheduler.run(flow, backend='sequential', num_workers=1, monitor=False, from_scratch=False)[source]
mFlow.Workflow.scheduler.run_parallel(flow, data=None, backend='multithread', num_workers=1, monitor=False, from_scratch=False)[source]
mFlow.Workflow.scheduler.run_parallel_pipeline(flow, data=None, backend='multithread_pipeline', num_workers=1, monitor=False, from_scratch=False, refresh_rate=0.05)[source]
mFlow.Workflow.scheduler.run_pipeline(flow, data=None, monitor=False, from_scratch=False)[source]
mFlow.Workflow.scheduler.run_sequential(flow, data=None, monitor=False, from_scratch=False)[source]

mFlow.Workflow.workflow Module

class mFlow.Workflow.workflow.workflow(nodes={})[source]

Bases: object

__init__(nodes={})[source]

Workflow constructor

Parameters
  • nodes (dict) – a dictionary of workflow output nodes. Keys are used as tags

  • outputs. (to identify) –

add(nodes)[source]

Adds the given list of workflow nodes to the workflow, along with all of their ancestors. Sets the out_tag property of the nodes if not already set.

Parameters

nodes – a list of workflow nodes (or a single workflow node).

add_edge(node_from, node_to)[source]

Add a directed edge between nodes

Parameters
  • node_from – ID of from node

  • node_to – ID of to node

add_node(name, block)[source]

Adds a node to the workflow with a specified name and block.

Parameters
  • name (str) – unique name for workflow node

  • block – the workflow block for this node

add_output(node, tag)[source]

Adds the given workflow node to the set of workflow outputs with the given tag, then adds all of the node’s ancestors to the workflow.

Parameters
  • node – a workflow node

  • tag – a string to identifiy the node

add_pipeline_node(id, plNode)[source]

Add a pipline node to pipeline workflow graph

Parameters
  • id (string) – id for the node

  • plNode – pipelined workflow node

draw(refresh=True)[source]

Display the workflow graph in a Jupyter notebook

Parameters

refresh (bool) – If True, clear the cell before drawing.

drawPipelined(refresh=True)[source]

Display pipilined workflow graph in a Jupyter notebook.

Parameters

refresh (bool) – If True, clear the cell before drawing.

pipeline(initGraph)[source]

Convert the given workflow graph to a pipelined representation where chanins in the workflow graph are replaced by single node.

Parameters

initGraph – A workflow graph.

pipelineGraphCreate(process_dict)[source]

Convert a dictionary of pipelined workflow nodes to a pipelined workflow graph.

Parameters

process_dict – dictionary of pipelined workflow nodes.

recursive_add_node(node)[source]

Adds the given list of workflow nodes to the workflow, along with all of their ancestors. If the node is already in the workflow, does not add duplicates.

Parameters

nodes – a list of workflow nodes.

remove(nodes)[source]

Removes the given workflow nodes from the workflow, along with all of their descendents.

Parameters

nodes – a list of workflow nodes (or a single workflow node).

run(backend='sequential', num_workers=1, monitor=False, from_scratch=False)[source]

Run the workflow with the specified backend scheduler.

Parameters
  • backend (string) – The type of scheduling backend to use (sequential | multithread | multiprocess | pipeline | multithread_pipeline | multiprocess_pipeline). See mFlow.Workflow.scheduler for documentation.

  • num_workers (int) – Number of workers to use in parallel backends

  • monitor (bool) – If True, use graphical execution monitorin for Jupyter notebooks

  • from_scratch (bool) – If True, run the workflow from scratch, discarding any cached results.

set_status(node, status)[source]

Set the status of a workflow node

Parameters
  • node – a workflow node

  • status (string) – scheduled | notscheduled | running | done

Contributors

Link to the list of contributors who participated in this project.

License

This project is licensed under the BSD 2-Clause - see the license file for details.

Indices and tables