Can't Consume JSON Messages From Kafka Using Kafka-Python's Deserializer -


i trying send simple json object through kafka , read out other side using python , kafka-python. however, keep seeing following error:

2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:error:10620:error processing callback traceback (most recent call last):   file "c:\anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs     f(value)   file "c:\anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response     unpacked = list(self._unpack_message_set(tp, messages))   file "c:\anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set     tp.topic, msg.value)   file "c:\anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize     return f(bytes_)   file "c:\users\myuser\workspace\pythonkafkatest\src\example.py", line 55, in <lambda>     value_deserializer=lambda m: json.loads(m).decode('utf-8'))   file "c:\anaconda2\lib\json\__init__.py", line 339, in loads     return _default_decoder.decode(s)   file "c:\anaconda2\lib\json\decoder.py", line 364, in decode     obj, end = self.raw_decode(s, idx=_w(s, 0).end())   file "c:\anaconda2\lib\json\decoder.py", line 382, in raw_decode     raise valueerror("no json object decoded") valueerror: no json object decoded 

i’ve done research , common cause of error json wrong. have tried printing out json before send adding following code , json prints no errors.

  while true:         json_obj1 = json.dumps({"dataobjectid": "test1"})         print json_obj1         producer.send('my-topic', {"dataobjectid": "test1"})         producer.send('my-topic', {"dataobjectid": "test2"})         time.sleep(1) 

this leads me suspect can produce json, not consume it.

here code:

import threading import logging import time import json  kafka import kafkaconsumer, kafkaproducer   class producer(threading.thread):     daemon = true      def run(self):         producer = kafkaproducer(bootstrap_servers='localhost:9092',                                  value_serializer=lambda v: json.dumps(v).encode('utf-8'))          while true:             producer.send('my-topic', {"dataobjectid": "test1"})             producer.send('my-topic', {"dataobjectid": "test2"})             time.sleep(1)   class consumer(threading.thread):     daemon = true      def run(self):         consumer = kafkaconsumer(bootstrap_servers='localhost:9092',                                  auto_offset_reset='earliest',                                  value_deserializer=lambda m: json.loads(m).decode('utf-8'))         consumer.subscribe(['my-topic'])          message in consumer:             print (message)   def main():     threads = [         producer(),         consumer()     ]      t in threads:         t.start()      time.sleep(10)  if __name__ == "__main__":     logging.basicconfig(         format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:' +                '%(levelname)s:%(process)d:%(message)s',         level=logging.info     )     main() 

i can send , receive strings if remove value_serializer , value_deserializer. when run code can see json sending in. here short snipit:

consumerrecord(topic=u'my-topic', partition=0, offset=5742, timestamp=none, timestamp_type=none, key=none, value='{"dataobjectid": "test1"}', checksum=-1301891455, serialized_key_size=-1, serialized_value_size=25) consumerrecord(topic=u'my-topic', partition=0, offset=5743, timestamp=none, timestamp_type=none, key=none, value='{"dataobjectid": "test2"}', checksum=-1340077864, serialized_key_size=-1, serialized_value_size=25) consumerrecord(topic=u'my-topic', partition=0, offset=5744, timestamp=none, timestamp_type=none, key=none, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4) consumerrecord(topic=u'my-topic', partition=0, offset=5745, timestamp=none, timestamp_type=none, key=none, value='\xc2hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17) consumerrecord(topic=u'my-topic', partition=0, offset=5746, timestamp=none, timestamp_type=none, key=none, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4) consumerrecord(topic=u'my-topic', partition=0, offset=5747, timestamp=none, timestamp_type=none, key=none, value='\xc2hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17) 

so tried removing value_deserializer consumer, , code executes without deserializer message comes out string, isn't need. so, why doesn't value_deserializer work? there different way json kafka message should using?

my problem solved after decoding message first utf-8, , json.load/dump it:

value_deserializer=lambda m: json.loads(m.decode('utf-8')) 

instead of:

value_deserializer=lambda m: json.loads(m).decode('utf-8') 

hope work producer's side


Comments

Popular posts from this blog

How to understand 2 main() functions after using uftrace to profile the C++ program? -

c# - Update a combobox from a presenter (MVP) -

How to put a lock and transaction on table using spring 4 or above using jdbcTemplate and annotations like @Transactional? -