python - Split time-series dataframe -


i have dataframe have different parameters columns , timestamp each row of parameters.

what want split dataframe windows, column values each row appended single row. enable me run clustering using these features.

for example, want transform dataframe (window size 3):

2017-01-01 00:00:01, a1, b1, c1 2017-01-01 00:00:02, a2, b2, c2 2017-01-01 00:00:03, a3, b3, c3 2017-01-01 00:00:04, a4, b4, c4 2017-01-01 00:00:05, a5, b5, c5 2017-01-01 00:00:06, a6, b6, c6 2017-01-01 00:00:07, a7, b7, c7 

into this:

2017-01-01 00:00:01, 2017-01-01 00:00:03, a1, a2, a3, b1, b2, b3, c1, c2, c3 2017-01-01 00:00:04, 2017-01-01 00:00:06, a4, a5, a6, b4, b5, b6, c4, c5, c6 

i need preserve information time interval belongs cluster, after clustering, why need keep time ranges. last instant in example excluded there's not enough measurements create window.

how can using spark?

let's start data, according description:

from pyspark.sql.functions import unix_timestamp  df = sc.parallelize([("2017-01-01 00:00:01", 2.0, 2.0, 2.0), ("2017-01-01 00:00:08", 9.0, 9.0, 9.0), ("2017-01-01 00:00:02", 3.0, 3.0, 3.0), ("2017-01-01 00:00:03", 4.0, 4.0, 4.0), ("2017-01-01 00:00:04", 5.0, 5.0, 5.0), ("2017-01-01 00:00:05", 6.0, 6.0, 6.0), ("2017-01-01 00:00:06", 7.0, 7.0, 7.0), ("2017-01-01 00:00:07", 8.0, 8.0, 8.0)]).todf(["time","a","b","c"]) df = df.withcolumn("time", unix_timestamp("time", "yyyy-mm-dd hh:mm:ss").cast("timestamp")) 

> spark 2.0

we generate new interval column using ceil() function , can group data , collect other variables 1 flat list.

to guarantee correct ordering inside resulting lists, irrespective of initial order, we'll use window functions, partition data date, creating rank column ordered time.

from pyspark.sql.window import window pyspark.sql.functions import ceil  df = df.withcolumn("date", df["time"].cast("date")) \        .withcolumn("interval", ((ceil(df["time"].cast("long") / 3l))*3.0).cast("timestamp"))   window = window.partitionby(df['date']).orderby(df['time']) 

because we'll collect rank column nested list correct ordering, we'll define udf unpacks values in nested lists, the first one, rank :

def unnest(col):    l = [item[1:] item in col]   res = [item sublist in l item in sublist]    return(res)  unnest_udf = udf(unnest) 

now putting together:

from pyspark.sql.functions import rank pyspark.sql.functions import collect_list, array  df.select('*', rank().over(window).alias('rank')) \   .groupby("interval") \   .agg(collect_list(array("rank","a", "b","c")).alias("vals")) \   .withcolumn("vals", unnest_udf("vals")) \   .sort("interval") \   .show(truncate = false) +---------------------+---------------------------------------------+ |interval             |vals                                         | +---------------------+---------------------------------------------+ |2017-01-01 00:00:03.0|[2.0, 2.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0]| |2017-01-01 00:00:06.0|[5.0, 5.0, 5.0, 6.0, 6.0, 6.0, 7.0, 7.0, 7.0]| |2017-01-01 00:00:09.0|[8.0, 8.0, 8.0, 9.0, 9.0, 9.0]               | +---------------------+---------------------------------------------+ 

spark 1.6

we cannot use array argument inside collect_list(), we'll wrap collect_list() calls inside array, instead of other way around. we'll modify our udf because won't explicitly needing rank column using approach.

unpack_udf = udf(     lambda l: [item sublist in l item in sublist] )  df.select('*', rank().over(window).alias('rank')) \   .groupby("interval") \   .agg(array(collect_list("a"),              collect_list("b"),              collect_list("c")).alias("vals")) \   .withcolumn("vals", unpack_udf("vals")) \   .sort("interval") \   .show(truncate = false) +---------------------+---------------------------------------------+ |interval             |vals                                         | +---------------------+---------------------------------------------+ |2017-01-01 00:00:03.0|[2.0, 3.0, 4.0, 2.0, 3.0, 4.0, 2.0, 3.0, 4.0]| |2017-01-01 00:00:06.0|[5.0, 6.0, 7.0, 5.0, 6.0, 7.0, 5.0, 6.0, 7.0]| |2017-01-01 00:00:09.0|[8.0, 9.0, 8.0, 9.0, 8.0, 9.0]               | +---------------------+---------------------------------------------+ 

note vals column ordered in different way, yet consistently window function defined earlier.


Comments

Popular posts from this blog

Command prompt result in label. Python 2.7 -

javascript - How do I use URL parameters to change link href on page? -

amazon web services - AWS Route53 Trying To Get Site To Resolve To www -