1.创建生产者

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers=['127.0.0.1:5000', '127.0.0.1:5001', '127.0.0.1:5002'])

future = producer.send("pic_collect", b'I am rito yan')
try:
    record_metadata = future.get(timeout=10)
    print(record_metadata)
except KafkaError as e:
    print(e)

2.创建消费者:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "pic_collect",
    group_id="pic_consumer",
    bootstrap_servers=['127.0.0.1:5000', '127.0.0.1:5001', '127.0.0.1:5002'])

for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                         message.value))
11-12 03:38