本次实验是在Mac下操作,使用kafka之前先安装kafka,具体操作如下:
1. 安装kafka
brew install kafka
(1) 安装过程将依赖安装 jdk8,zookeeper(jdk8没有,需要单独安装)
(2) kafka安装位置
/usr/local/Cellar/zookeeper
/usr/local/Cellar/kafka
(3) 配置文件位置
/usr/local/etc/kafka/zookeeper.properties (默认端口2181)
/usr/local/etc/kafka/server.properties (默认端口9092)
2. 启动zookeeper
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
3. 启动kafka服务
kafka-server-start /usr/local/etc/kafka/server.properties &
4. 创建topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
5. 查看创建的topic
kafka-topics --list --zookeeper localhost:2181
6. 生产数据
kafka-console-producer --broker-list localhost:9092 --topic test
7. 消费数据
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
备注:--from-beginning 将从第一个消息开始接收
8. Python下实现kafka
(1)producer实现,producer.py如下:
# -*- coding: utf-8 -*- from kafka import KafkaProducer """ 生产者模块 """ class Producer(object): def __init__(self, kafka_hosts, kafka_topic): self.kafka_topic = kafka_topic # bootstrap_servers 可以是多台服务器 self.producer = KafkaProducer(bootstrap_servers=[kafka_hosts]) self.send() def send(self): future = self.producer.send('my_topic', key=b'my_key', value=b'my_value', partition=0) result = future.get(timeout=10) print(result) if __name__ == '__main__': p = Producer('localhost:9092','my_topic')
(2)consumer实现,consumer.py如下:
# -*- coding: utf-8 -*- from kafka import KafkaConsumer """ 消费者模块 """ class Consumer(object): def __init__(self, kafka_hosts, kafka_topic): self.consumers = KafkaConsumer(kafka_topic,bootstrap_servers=[kafka_hosts]) def consume(self): for message in self.consumers: print(message) if __name__ == '__main__': c = Consumer('localhost:9092','my_topic') c.consume()
运行consumer.py : python3 consumer.py。然后运行 python3 producer.py,输出如下:
➜ kafka_client git:(master) ✗ python3 consumer.py
ConsumerRecord(topic='my_topic', partition=0, offset=3, timestamp=1546670287366, timestamp_type=0, key=b'my_key', value=b'my_value', headers=[], checksum=None, serialized_key_size=6, serialized_value_size=8, serialized_header_size=-1)