kafka and Spark: Get first offset of a topic via API -
i playing spark streaming , kafka (with scala api), , read message set of kafka topics spark streaming.
the following method:
val kafkaparams = map("metadata.broker.list" -> configuration.getkafkabrokerslist(), "auto.offset.reset" -> "smallest") kafkautils.createdirectstream[string, string, stringdecoder, stringdecoder](ssc, kafkaparams, topics)
reads kafka latest available offset, doesn't give me metadata need (since reading set of topics, need every message read topic) other method kafkautils.createdirectstream[string, string, stringdecoder, stringdecoder, tuple2[string, string]](ssc, kafkaparams, currentoffsets, messagehandler)
wants explicitly offset don't have.
i know there shell command gives last offset.
kafka-run-class.sh kafka.tools.getoffsetshell --broker-list <broker>: <port> --topic <topic-name> --time -1 --offsets 1
and kafkacluster.scala
api developers used public , gives like.
hint?
you can use code getoffsetshell.scala kafka api documentation
val consumer = new simpleconsumer(leader.host, leader.port, 10000, 100000, clientid) val topicandpartition = topicandpartition(topic, partitionid) val request = offsetrequest(map(topicandpartition -> partitionoffsetrequestinfo(time, noffsets))) val offsets = consumer.getoffsetsbefore(request).partitionerrorandoffsets(topicandpartition).offsets
or can create new consumer unique groupid , use getting first offset
val consumer=new kafkaconsumer[string, string](createconsumerconfig(config.brokerlist)) consumer.partitionsfor(config.topic).foreach(pi => { val topicpartition = new topicpartition(pi.topic(), pi.partition()) consumer.assign(list(topicpartition)) consumer.seektobeginning() val firstoffset = consumer.position(topicpartition) ...
Comments
Post a Comment