Spark structured streaming with python -


i trying spark structured streaming kafka , python. requirement: need process streaming data kafka (in json format) in spark (perform transformations) , store in database.

i have data in json format like, {"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}

i planning use spark.readstream reading kafka like,

data = spark.readstream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load() 

i referred this link reference didn't how parse json data. tried this,

data = data.selectexpr("cast(a float)","cast(b float)", "cast(name string)", "cast(time string)").as[(float, float, string, string)] 

but looks doesn't work.

can has worked on spark structured streaming python guide me proceed sample examples or links?

using,

schema = structtype([     structfield("a", doubletype()),     structfield("b", doubletype()),     structfield("name", stringtype()),     structfield("time", timestamptype())])  indata = spark.readstream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load() data = indata.select(from_json(col("value").cast("string"), schema)) query = data.writestream.outputmode("append").format("console").start() 

program runs getting values on console as,

+-----------------------------------+ |jsontostruct(cast(value string))| +-----------------------------------+ |               [null,null,null,2...| |               [null,null,null,2...| +-----------------------------------+  17/04/07 19:23:15 info streamexecution: streaming query made progress: {   "id" : "8e2355cb-0fd3-4233-89d8-34a855256b1e",   "runid" : "9fc462e0-385a-4b05-97ed-8093dc6ef37b",   "name" : null,   "timestamp" : "2017-04-07t19:23:15.013z",   "numinputrows" : 2,   "inputrowspersecond" : 125.0,   "processedrowspersecond" : 12.269938650306749,   "durationms" : {     "addbatch" : 112,     "getbatch" : 8,     "getoffset" : 2,     "queryplanning" : 4,     "triggerexecution" : 163,     "walcommit" : 26   },   "eventtime" : {     "watermark" : "1970-01-01t00:00:00.000z"   },   "stateoperators" : [ ],   "sources" : [ {     "description" : "kafkasource[subscribe[test]]",     "startoffset" : {       "test" : {         "0" : 366       }     },     "endoffset" : {       "test" : {         "0" : 368       }     },     "numinputrows" : 2,     "inputrowspersecond" : 125.0,     "processedrowspersecond" : 12.269938650306749   } ],   "sink" : {     "description" : "org.apache.spark.sql.execution.streaming.consolesink@6aa91aa2"   } } 

did miss here.

you can either use from_json schema:

from pyspark.sql.functions import col, from_json pyspark.sql.types import *  schema = structtype([     structfield("a", doubletype()),     structfield("b", doubletype()),      structfield("name", stringtype()),      structfield("time", timestamptype())])  data.select(from_json(col("value").cast("string"), schema)) 

or individual fields strings get_json_object:

from pyspark.sql.functions import get_json_object  data.select([     get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)     c in ["a", "b", "name", "time"]]) 

and cast them later according needs.


Comments

Popular posts from this blog

Command prompt result in label. Python 2.7 -

javascript - How do I use URL parameters to change link href on page? -

amazon web services - AWS Route53 Trying To Get Site To Resolve To www -