Source code for mFlow.Blocks.ccwrapper

import sys, os
from mFlow.Workflow.compute_graph import node
from mFlow.Utilities.utilities import getCacheDir
import pandas as pd
import time

[docs]def ccwrapper(*args, **kwargs): f=args[0] if("name" in kwargs): name = kwargs["name"] del kwargs["name"] else: name = f.__name__ return node(function = __ccwrapper, args=args, kwargs=kwargs, name=name)
def __ccwrapper(f, df, sort_field=None, **kwargs): df=df["dataframe"] df=f(df, **kwargs) if(sort_field is not None): df=df.sort(sort_field) return({"dataframe":df})
[docs]def cc_to_pandas(*args, **kwargs): if("name" in kwargs): name = kwargs["name"] del kwargs["name"] else: name = "cc_to_pandas" return node(function = __cc_to_pandas, args=args, kwargs=kwargs, name=name)
def __cc_to_pandas(df, participant_field=None, key="dataframe", datetime_field=None, time_trunc="1T",cache_filename=None ): cache_dir = getCacheDir() cache_file_path = os.path.join(cache_dir ,cache_filename) #Check if requesting cached copy and have cached copy if(cache_file_path is not None): if(os.path.exists(cache_file_path)): df = pd.read_pickle(cache_file_path) return({"dataframe":df}) #Get the dataframe df=df[key].toPandas() if time_trunc is not None: df[datetime_field]=df[datetime_field].apply(lambda x: x.floor(freq=time_trunc)) df=df.set_index([participant_field,datetime_field]) df.index.names = ["ID","Time"] #If needed, cache the dataframe if(cache_file_path is not None): df.to_pickle(cache_file_path) return({"dataframe":df})