使用python3第三方工具,实现kafka消费

 1 # -*- coding: utf-8 -*-
 2
 3 import uuid
 4 import json
 5 from kafka import KafkaConsumer
 6 from xxxxxx import MessageToDict
 7 from xxx import ObjectInfo
 8
 9 import sys
10 import codecs
11
12 sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
13
14
15 class ReadKafkaContent(object):
16     @staticmethod
17     def deserialize(msg):
18         """
19         反序列化
20         :param msg:
21         :return:
22         """
23         pb_obj = ObjectInfo()
24         pb_obj.Clear()
25         pb_obj.ParseFromString(msg.value)
26         return MessageToDict(pb_obj, including_default_value_fields=True, preserving_proto_field_name=True)
27
28     def consume_msg(self, consumer_obj):
29         """
30         逐条消费,返回反序列化后的内容
31         :param consumer_obj:
32         :return:
33         """
34         try:
35             while True:
36                 msg = next(consumer_obj, None)
37                 if not msg:
38                     continue
39                 content = self.deserialize(msg)
40                 return content
41         except Exception as ex:
42             print(u"消费kafka错误,退出测试")
43             return None
44
45     def entry(self, topic, ip, count=10, log="log_read_kafka_content.json"):
46         """
47
48         :param topic:topic
49         :param ip:ip
50         :param count:查询kafka数据数量,默认10条
51         :param log:内容保存地址,默认
52         :return:
53         """
54         print(u"开始......")
55         try:
56             # 创建kafka消费对象
57             print(u"创建kafka消费对象...")
58             consumer = KafkaConsumer(topic, group_id=uuid.uuid4().hex,
59                                      bootstrap_servers=[ip],
60                                      auto_offset_reset="latest", consumer_timeout_ms=3 * 1000)
61         except Exception as ex:
62             print(u"连接kafka失败!")
63             return False
64         print(u"kafka消费对象创建成功.")
65
66         with open(log, "w") as f:
67             for i in range(count):
68                 print(u"开始消费第%s条数据..." % str(i + 1))
69                 content = self.consume_msg(consumer)
70                 if not content:
71                     return False
72
73                 # dict转json保存数据内容
74                 content_json = json.dumps(content, ensure_ascii=False, indent=4)
75                 f.write(content_json)
76                 f.write("\n\n")
77                 print(u"第%s条数据写入完成." % str(i + 1))
78
79         print(u"完成.")
01-02 06:57