Source code for mFlow.Blocks.data_loader_wesad

import sys, os
from mFlow.Workflow.compute_graph import node
from mFlow.Utilities.utilities import getDataDir

import requests, zipfile, io, tarfile
import glob
import pandas as pd
import time
import tarfile
from cerebralcortex.kernel import Kernel
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from cerebralcortex.core.datatypes.datastream import DataStream

#def wesad_data_loader(**kwargs):
#    return (node(function = __wesad_data_loader, kwargs=kwargs, name="WESAD Data Loader"),
#            node(function = __wesad_label_loader, kwargs=kwargs, name="WESAD Label Loader"))

[docs]def wesad_data_loader(**kwargs): return node(function = __wesad_data_loader, kwargs=kwargs, name="WESAD Data Loader")
def __wesad_data_download(data_size="all"): base_data_dir = getDataDir() compressed_data_dir = os.path.join(base_data_dir,"compressed") wesad_data_dir = os.path.join(base_data_dir,"wesad_"+data_size) wesad_cc_dir = os.path.join(wesad_data_dir,"cc_data") if(data_size=="all"): compressed_file="cc_data.tar.bz2" elif(data_size=="small"): compressed_file="s2_data.tar.bz2" compressed_data_url="http://mhealth.md2k.org/images/datasets/%s"%compressed_file compressed_data_local_file = os.path.join(compressed_data_dir,compressed_file) #Make all data directories for d in [base_data_dir, compressed_data_dir, wesad_data_dir]: if(not os.path.exists(d)): os.mkdir(d) #Check if cc data store exists: if(not os.path.exists(wesad_cc_dir)): #Check if compressed cc version of the data set exists if(not os.path.isfile(compressed_data_local_file)): print("Downloading WESAD data archive to %s "%compressed_data_local_file,end="") #Download the compressed data file response = requests.get(compressed_data_url, stream=True) response.raise_for_status() with open(compressed_data_local_file , 'wb') as f: for chunk in response.iter_content(chunk_size=1024*1024): f.write(chunk) print(".",end="",flush=True) print("") print("Done") #Extract the data set print("Extracting WESAD data archive to %s ..."%wesad_cc_dir,end="",flush=True) tar = tarfile.open(compressed_data_local_file) tar.extractall(wesad_data_dir) tar.close() print(" Done") def __wesad_data_loader(data_size="small"): #Download the data if needed base_data_dir = getDataDir() compressed_data_dir = os.path.join(base_data_dir,"compressed") wesad_data_dir = os.path.join(base_data_dir,"wesad_"+data_size) wesad_cc_dir = os.path.join(wesad_data_dir,"cc_data") if(not os.path.exists(wesad_cc_dir)): __wesad_data_download(data_size=data_size) #Creat CC kernel to load WESAD data cc_configs = {"nosql_storage": "filesystem", "filesystem":{"filesystem_path": wesad_cc_dir }, "relational_storage": "sqlite", "sqlite": {"file_path": wesad_cc_dir }, } CC = Kernel(cc_configs=cc_configs, study_name="wesad") CC.sqlContext.sql("set spark.sql.shuffle.partitions=1") #Get the data data = CC.get_stream("wesad.chest.ecg") if(data_size=="small"): data = data.filter_user(["s2"]) #Get and process the labels labels = CC.get_stream("wesad.label") if(data_size=="small"): labels = labels.filter_user(["s2"]) elif(data_size=="all"): pass else: raise ValueError("Error: data_size must be 'small' or 'all'") #Data set documentation indicates removing label 0 and 5,6,7 #for this data set labels = labels.filter("label > 0") labels = labels.filter("label < 5") #Convert problem to stress/not stress with stress=1, not stress=0 labels= labels.replace({1:0,2:1,3:0,4:0},[], subset=["label"]) # window label data into 60 seconds chunks win = F.window("timestamp", windowDuration="60 seconds",startTime='0 seconds') windowed_labels=labels.groupby(["user","version",win]).agg(F.collect_list("label")) # label each window label_window_udf = udf(lambda s: max(set(s), key=s.count), IntegerType()) final_labels = DataStream(windowed_labels.withColumn("target", label_window_udf(F.col("collect_list(label)"))).drop("collect_list(label)")) final_labels = DataStream(final_labels.withColumn("timestamp",final_labels["window"].start).drop("version").drop("window")) return({"dataframe": data,"labels":final_labels}) def __wesad_label_loader(data_size="small"): CC = Kernel(cc_configs="default", study_name="wesad") CC.sqlContext.sql("set spark.sql.shuffle.partitions=1") #Get and process the labels labels = CC.get_stream("wesad.label") if(data_size=="small"): labels = labels.filter_user(["s2"]) elif(data_size=="all"): pass else: raise ValueError("Error: data_size must be 'small' or 'all'") #Data set documentation indicates removing label 0 and 5,6,7 #for this data set labels = labels.filter("label > 0") labels = labels.filter("label < 5") #Convert problem to stress/not stress with stress=1, not stress=0 labels= labels.replace({1:0,2:1,3:0,4:0},[], subset=["label"]) # window label data into 60 seconds chunks win = F.window("timestamp", windowDuration="60 seconds",startTime='0 seconds') windowed_labels=labels.groupby(["user","version",win]).agg(F.collect_list("label")) # label each window label_window_udf = udf(lambda s: max(set(s), key=s.count), IntegerType()) final_labels = DataStream(windowed_labels.withColumn("target", label_window_udf(F.col("collect_list(label)"))).drop("collect_list(label)")) final_labels = DataStream(final_labels.withColumn("timestamp",final_labels["window"].start).drop("version").drop("window")) return({"dataframe": final_labels})