Spark structured streaming with python -
i trying spark structured streaming kafka , python. requirement: need process streaming data kafka (in json format) in spark (perform transformations) , store in database.
i have data in json format like, {"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}
i planning use spark.readstream
reading kafka like,
data = spark.readstream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load()
i referred this link reference didn't how parse json data. tried this,
data = data.selectexpr("cast(a float)","cast(b float)", "cast(name string)", "cast(time string)").as[(float, float, string, string)]
but looks doesn't work.
can has worked on spark structured streaming python guide me proceed sample examples or links?
using,
schema = structtype([ structfield("a", doubletype()), structfield("b", doubletype()), structfield("name", stringtype()), structfield("time", timestamptype())]) indata = spark.readstream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load() data = indata.select(from_json(col("value").cast("string"), schema)) query = data.writestream.outputmode("append").format("console").start()
program runs getting values on console as,
+-----------------------------------+ |jsontostruct(cast(value string))| +-----------------------------------+ | [null,null,null,2...| | [null,null,null,2...| +-----------------------------------+ 17/04/07 19:23:15 info streamexecution: streaming query made progress: { "id" : "8e2355cb-0fd3-4233-89d8-34a855256b1e", "runid" : "9fc462e0-385a-4b05-97ed-8093dc6ef37b", "name" : null, "timestamp" : "2017-04-07t19:23:15.013z", "numinputrows" : 2, "inputrowspersecond" : 125.0, "processedrowspersecond" : 12.269938650306749, "durationms" : { "addbatch" : 112, "getbatch" : 8, "getoffset" : 2, "queryplanning" : 4, "triggerexecution" : 163, "walcommit" : 26 }, "eventtime" : { "watermark" : "1970-01-01t00:00:00.000z" }, "stateoperators" : [ ], "sources" : [ { "description" : "kafkasource[subscribe[test]]", "startoffset" : { "test" : { "0" : 366 } }, "endoffset" : { "test" : { "0" : 368 } }, "numinputrows" : 2, "inputrowspersecond" : 125.0, "processedrowspersecond" : 12.269938650306749 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.consolesink@6aa91aa2" } }
did miss here.
you can either use from_json
schema:
from pyspark.sql.functions import col, from_json pyspark.sql.types import * schema = structtype([ structfield("a", doubletype()), structfield("b", doubletype()), structfield("name", stringtype()), structfield("time", timestamptype())]) data.select(from_json(col("value").cast("string"), schema))
or individual fields strings get_json_object
:
from pyspark.sql.functions import get_json_object data.select([ get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c) c in ["a", "b", "name", "time"]])
and cast
them later according needs.
Comments
Post a Comment