本次实验是在Mac下操作,使用kafka之前先安装kafka,具体操作如下:

1. 安装kafka

 brew install kafka

(1)  安装过程将依赖安装 jdk8,zookeeper(jdk8没有,需要单独安装)

(2)  kafka安装位置

/usr/local/Cellar/zookeeper

/usr/local/Cellar/kafka

(3)  配置文件位置

/usr/local/etc/kafka/zookeeper.properties  (默认端口2181)

/usr/local/etc/kafka/server.properties  (默认端口9092)

2. 启动zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

3. 启动kafka服务

kafka-server-start /usr/local/etc/kafka/server.properties &

4. 创建topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

5. 查看创建的topic

kafka-topics --list --zookeeper localhost:2181  

6. 生产数据

kafka-console-producer --broker-list localhost:9092 --topic test

7. 消费数据

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

备注:--from-beginning  将从第一个消息开始接收

8. Python下实现kafka

(1)producer实现,producer.py如下:

# -*- coding: utf-8 -*-
from kafka import KafkaProducer

"""
生产者模块
"""
class Producer(object):

    def __init__(self, kafka_hosts, kafka_topic):
        self.kafka_topic = kafka_topic
        # bootstrap_servers 可以是多台服务器
        self.producer = KafkaProducer(bootstrap_servers=[kafka_hosts])
        self.send()

    def send(self):
        future = self.producer.send('my_topic', key=b'my_key', value=b'my_value', partition=0)
        result = future.get(timeout=10)
        print(result)

if __name__ == '__main__':
    p = Producer('localhost:9092','my_topic')

(2)consumer实现,consumer.py如下:

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
"""
消费者模块
"""
class Consumer(object):

    def __init__(self, kafka_hosts, kafka_topic):
        self.consumers = KafkaConsumer(kafka_topic,bootstrap_servers=[kafka_hosts])

    def consume(self):
        for message in self.consumers:
            print(message)

if __name__ == '__main__':
    c = Consumer('localhost:9092','my_topic')
    c.consume()

运行consumer.py : python3 consumer.py。然后运行 python3 producer.py,输出如下:

➜  kafka_client git:(master) ✗ python3 consumer.py
ConsumerRecord(topic='my_topic', partition=0, offset=3, timestamp=1546670287366, timestamp_type=0, key=b'my_key', value=b'my_value', headers=[], checksum=None, serialized_key_size=6, serialized_value_size=8, serialized_header_size=-1)


github:https://github.com/littlemesie/big_data_framework/tree/master/kafka_client

01-05 22:31