1.创建生产者
from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer( bootstrap_servers=['127.0.0.1:5000', '127.0.0.1:5001', '127.0.0.1:5002']) future = producer.send("pic_collect", b'I am rito yan') try: record_metadata = future.get(timeout=10) print(record_metadata) except KafkaError as e: print(e)
2.创建消费者:
from kafka import KafkaConsumer consumer = KafkaConsumer( "pic_collect", group_id="pic_consumer", bootstrap_servers=['127.0.0.1:5000', '127.0.0.1:5001', '127.0.0.1:5002']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))