kafka and Spark: Get first offset of a topic via API -


i playing spark streaming , kafka (with scala api), , read message set of kafka topics spark streaming.

the following method:

val kafkaparams = map("metadata.broker.list" -> configuration.getkafkabrokerslist(), "auto.offset.reset" -> "smallest") kafkautils.createdirectstream[string, string, stringdecoder, stringdecoder](ssc, kafkaparams, topics) 

reads kafka latest available offset, doesn't give me metadata need (since reading set of topics, need every message read topic) other method kafkautils.createdirectstream[string, string, stringdecoder, stringdecoder, tuple2[string, string]](ssc, kafkaparams, currentoffsets, messagehandler) wants explicitly offset don't have.

i know there shell command gives last offset.

kafka-run-class.sh kafka.tools.getoffsetshell    --broker-list <broker>:  <port>    --topic <topic-name> --time -1 --offsets 1  

and kafkacluster.scala api developers used public , gives like.

hint?

you can use code getoffsetshell.scala kafka api documentation

val consumer = new simpleconsumer(leader.host, leader.port, 10000, 100000, clientid) val topicandpartition = topicandpartition(topic, partitionid) val request = offsetrequest(map(topicandpartition -> partitionoffsetrequestinfo(time, noffsets))) val offsets = consumer.getoffsetsbefore(request).partitionerrorandoffsets(topicandpartition).offsets 

or can create new consumer unique groupid , use getting first offset

val consumer=new kafkaconsumer[string, string](createconsumerconfig(config.brokerlist)) consumer.partitionsfor(config.topic).foreach(pi => {       val topicpartition = new topicpartition(pi.topic(), pi.partition())        consumer.assign(list(topicpartition))       consumer.seektobeginning()       val firstoffset = consumer.position(topicpartition)  ... 

Comments

Popular posts from this blog

Command prompt result in label. Python 2.7 -

javascript - How do I use URL parameters to change link href on page? -

amazon web services - AWS Route53 Trying To Get Site To Resolve To www -