Can't Consume JSON Messages From Kafka Using Kafka-Python's Deserializer -
i trying send simple json object through kafka , read out other side using python , kafka-python. however, keep seeing following error:
2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:error:10620:error processing callback traceback (most recent call last): file "c:\anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs f(value) file "c:\anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response unpacked = list(self._unpack_message_set(tp, messages)) file "c:\anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set tp.topic, msg.value) file "c:\anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize return f(bytes_) file "c:\users\myuser\workspace\pythonkafkatest\src\example.py", line 55, in <lambda> value_deserializer=lambda m: json.loads(m).decode('utf-8')) file "c:\anaconda2\lib\json\__init__.py", line 339, in loads return _default_decoder.decode(s) file "c:\anaconda2\lib\json\decoder.py", line 364, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) file "c:\anaconda2\lib\json\decoder.py", line 382, in raw_decode raise valueerror("no json object decoded") valueerror: no json object decoded i’ve done research , common cause of error json wrong. have tried printing out json before send adding following code , json prints no errors.
while true: json_obj1 = json.dumps({"dataobjectid": "test1"}) print json_obj1 producer.send('my-topic', {"dataobjectid": "test1"}) producer.send('my-topic', {"dataobjectid": "test2"}) time.sleep(1) this leads me suspect can produce json, not consume it.
here code:
import threading import logging import time import json kafka import kafkaconsumer, kafkaproducer class producer(threading.thread): daemon = true def run(self): producer = kafkaproducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8')) while true: producer.send('my-topic', {"dataobjectid": "test1"}) producer.send('my-topic', {"dataobjectid": "test2"}) time.sleep(1) class consumer(threading.thread): daemon = true def run(self): consumer = kafkaconsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m).decode('utf-8')) consumer.subscribe(['my-topic']) message in consumer: print (message) def main(): threads = [ producer(), consumer() ] t in threads: t.start() time.sleep(10) if __name__ == "__main__": logging.basicconfig( format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:' + '%(levelname)s:%(process)d:%(message)s', level=logging.info ) main() i can send , receive strings if remove value_serializer , value_deserializer. when run code can see json sending in. here short snipit:
consumerrecord(topic=u'my-topic', partition=0, offset=5742, timestamp=none, timestamp_type=none, key=none, value='{"dataobjectid": "test1"}', checksum=-1301891455, serialized_key_size=-1, serialized_value_size=25) consumerrecord(topic=u'my-topic', partition=0, offset=5743, timestamp=none, timestamp_type=none, key=none, value='{"dataobjectid": "test2"}', checksum=-1340077864, serialized_key_size=-1, serialized_value_size=25) consumerrecord(topic=u'my-topic', partition=0, offset=5744, timestamp=none, timestamp_type=none, key=none, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4) consumerrecord(topic=u'my-topic', partition=0, offset=5745, timestamp=none, timestamp_type=none, key=none, value='\xc2hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17) consumerrecord(topic=u'my-topic', partition=0, offset=5746, timestamp=none, timestamp_type=none, key=none, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4) consumerrecord(topic=u'my-topic', partition=0, offset=5747, timestamp=none, timestamp_type=none, key=none, value='\xc2hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17) so tried removing value_deserializer consumer, , code executes without deserializer message comes out string, isn't need. so, why doesn't value_deserializer work? there different way json kafka message should using?
my problem solved after decoding message first utf-8, , json.load/dump it:
value_deserializer=lambda m: json.loads(m.decode('utf-8')) instead of:
value_deserializer=lambda m: json.loads(m).decode('utf-8') hope work producer's side
Comments
Post a Comment