apache-flink: sliding window in output -
i'm coding small application understand sliding windowing in flink (with data input apache-kafka topic):
//split kafka stream comma , create tuple datastream<tuple3<string, integer, date>> parsedstream = stream .map((line) -> { string[] cells = line.split(","); return new tuple3(cells[1], integer.parseint(cells[4]), f.parse(cells[2])); }); datastream<tuple3<string, integer, date>> parsedstreamwithtswm = parsedstream .assigntimestampsandwatermarks(new boundedoutofordernesstimestampextractor<tuple3<string, integer, date>>(time.minutes(1)) { @override public long extracttimestamp(tuple3<string, integer, date> element) { return element.f2.gettime(); } }); //sum values per windows , per id datastream<tuple3<string, integer, date>> aggstream = parsedstreamwithtswm .keyby(0) .window(slidingeventtimewindows.of(time.minutes(30), time.minutes(1))) .sum(1); aggstream.print();
is possible improve output (aggstream.print();) adding window details produce aggregation output ?
$ tail -f flink-chapichapo-jobmanager-0.out (228035740000002,300,fri apr 07 14:42:00 cest 2017) (228035740000000,28,fri apr 07 14:42:00 cest 2017) (228035740000002,300,fri apr 07 14:43:00 cest 2017) (228035740000000,27,fri apr 07 14:43:00 cest 2017) (228035740000002,300,fri apr 07 14:44:00 cest 2017) (228035740000000,26,fri apr 07 14:44:00 cest 2017) (228035740000001,27,fri apr 07 14:44:00 cest 2017) (228035740000002,300,fri apr 07 14:45:00 cest 2017) (228035740000000,25,fri apr 07 14:45:00 cest 2017)
thank in advance
you can use generic function apply
have access window
info.
public interface windowfunction<in, out, key, w extends window> extends function, serializable { /** * evaluates window , outputs none or several elements. * * @param key key window evaluated. * @param window window being evaluated. * @param input elements in window being evaluated. * @param out collector emitting elements. * * @throws exception function may throw exceptions fail program , trigger recovery. */ void apply(key key, w window, iterable<in> input, collector<out> out) throws exception; }
see docs
Comments
Post a Comment