python - Split time-series dataframe -
i have dataframe have different parameters columns , timestamp each row of parameters.
what want split dataframe windows, column values each row appended single row. enable me run clustering using these features.
for example, want transform dataframe (window size 3):
2017-01-01 00:00:01, a1, b1, c1 2017-01-01 00:00:02, a2, b2, c2 2017-01-01 00:00:03, a3, b3, c3 2017-01-01 00:00:04, a4, b4, c4 2017-01-01 00:00:05, a5, b5, c5 2017-01-01 00:00:06, a6, b6, c6 2017-01-01 00:00:07, a7, b7, c7
into this:
2017-01-01 00:00:01, 2017-01-01 00:00:03, a1, a2, a3, b1, b2, b3, c1, c2, c3 2017-01-01 00:00:04, 2017-01-01 00:00:06, a4, a5, a6, b4, b5, b6, c4, c5, c6
i need preserve information time interval belongs cluster, after clustering, why need keep time ranges. last instant in example excluded there's not enough measurements create window.
how can using spark?
let's start data, according description:
from pyspark.sql.functions import unix_timestamp df = sc.parallelize([("2017-01-01 00:00:01", 2.0, 2.0, 2.0), ("2017-01-01 00:00:08", 9.0, 9.0, 9.0), ("2017-01-01 00:00:02", 3.0, 3.0, 3.0), ("2017-01-01 00:00:03", 4.0, 4.0, 4.0), ("2017-01-01 00:00:04", 5.0, 5.0, 5.0), ("2017-01-01 00:00:05", 6.0, 6.0, 6.0), ("2017-01-01 00:00:06", 7.0, 7.0, 7.0), ("2017-01-01 00:00:07", 8.0, 8.0, 8.0)]).todf(["time","a","b","c"]) df = df.withcolumn("time", unix_timestamp("time", "yyyy-mm-dd hh:mm:ss").cast("timestamp"))
> spark 2.0
we generate new interval
column using ceil()
function , can group data , collect other variables 1 flat list.
to guarantee correct ordering inside resulting lists, irrespective of initial order, we'll use window
functions, partition data date
, creating rank
column ordered time
.
from pyspark.sql.window import window pyspark.sql.functions import ceil df = df.withcolumn("date", df["time"].cast("date")) \ .withcolumn("interval", ((ceil(df["time"].cast("long") / 3l))*3.0).cast("timestamp")) window = window.partitionby(df['date']).orderby(df['time'])
because we'll collect rank
column nested list correct ordering, we'll define udf
unpacks values in nested lists, the first one, rank
:
def unnest(col): l = [item[1:] item in col] res = [item sublist in l item in sublist] return(res) unnest_udf = udf(unnest)
now putting together:
from pyspark.sql.functions import rank pyspark.sql.functions import collect_list, array df.select('*', rank().over(window).alias('rank')) \ .groupby("interval") \ .agg(collect_list(array("rank","a", "b","c")).alias("vals")) \ .withcolumn("vals", unnest_udf("vals")) \ .sort("interval") \ .show(truncate = false) +---------------------+---------------------------------------------+ |interval |vals | +---------------------+---------------------------------------------+ |2017-01-01 00:00:03.0|[2.0, 2.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0]| |2017-01-01 00:00:06.0|[5.0, 5.0, 5.0, 6.0, 6.0, 6.0, 7.0, 7.0, 7.0]| |2017-01-01 00:00:09.0|[8.0, 8.0, 8.0, 9.0, 9.0, 9.0] | +---------------------+---------------------------------------------+
spark 1.6
we cannot use array
argument inside collect_list()
, we'll wrap collect_list()
calls inside array
, instead of other way around. we'll modify our udf
because won't explicitly needing rank
column using approach.
unpack_udf = udf( lambda l: [item sublist in l item in sublist] ) df.select('*', rank().over(window).alias('rank')) \ .groupby("interval") \ .agg(array(collect_list("a"), collect_list("b"), collect_list("c")).alias("vals")) \ .withcolumn("vals", unpack_udf("vals")) \ .sort("interval") \ .show(truncate = false) +---------------------+---------------------------------------------+ |interval |vals | +---------------------+---------------------------------------------+ |2017-01-01 00:00:03.0|[2.0, 3.0, 4.0, 2.0, 3.0, 4.0, 2.0, 3.0, 4.0]| |2017-01-01 00:00:06.0|[5.0, 6.0, 7.0, 5.0, 6.0, 7.0, 5.0, 6.0, 7.0]| |2017-01-01 00:00:09.0|[8.0, 9.0, 8.0, 9.0, 8.0, 9.0] | +---------------------+---------------------------------------------+
note vals
column ordered in different way, yet consistently window
function defined earlier.
Comments
Post a Comment