1 # -*- coding: utf-8 -*-
2
3 import uuid
4 import json
5 from kafka import KafkaConsumer
6 from xxxxxx import MessageToDict
7 from xxx import ObjectInfo
8
9 import sys
10 import codecs
11
12 sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
13
14
15 class ReadKafkaContent(object):
16 @staticmethod
17 def deserialize(msg):
18 """
19 反序列化
20 :param msg:
21 :return:
22 """
23 pb_obj = ObjectInfo()
24 pb_obj.Clear()
25 pb_obj.ParseFromString(msg.value)
26 return MessageToDict(pb_obj, including_default_value_fields=True, preserving_proto_field_name=True)
27
28 def consume_msg(self, consumer_obj):
29 """
30 逐条消费,返回反序列化后的内容
31 :param consumer_obj:
32 :return:
33 """
34 try:
35 while True:
36 msg = next(consumer_obj, None)
37 if not msg:
38 continue
39 content = self.deserialize(msg)
40 return content
41 except Exception as ex:
42 print(u"消费kafka错误,退出测试")
43 return None
44
45 def entry(self, topic, ip, count=10, log="log_read_kafka_content.json"):
46 """
47
48 :param topic:topic
49 :param ip:ip
50 :param count:查询kafka数据数量,默认10条
51 :param log:内容保存地址,默认
52 :return:
53 """
54 print(u"开始......")
55 try:
56 # 创建kafka消费对象
57 print(u"创建kafka消费对象...")
58 consumer = KafkaConsumer(topic, group_id=uuid.uuid4().hex,
59 bootstrap_servers=[ip],
60 auto_offset_reset="latest", consumer_timeout_ms=3 * 1000)
61 except Exception as ex:
62 print(u"连接kafka失败!")
63 return False
64 print(u"kafka消费对象创建成功.")
65
66 with open(log, "w") as f:
67 for i in range(count):
68 print(u"开始消费第%s条数据..." % str(i + 1))
69 content = self.consume_msg(consumer)
70 if not content:
71 return False
72
73 # dict转json保存数据内容
74 content_json = json.dumps(content, ensure_ascii=False, indent=4)
75 f.write(content_json)
76 f.write("\n\n")
77 print(u"第%s条数据写入完成." % str(i + 1))
78
79 print(u"完成.")