建立:
我有3个Docker容器
1) For Kafka
2) For Zookeeper
3) For JupyterLab
我在这些容器之间建立了网络,我发现kafka生产者能够运行和生产数据。
KafkaProducer.ipynb
KAFKA_BROKER = ['172.20.0.2:9093']
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER)
for _ in range(100):
print("sending")
producer.send('my-topic', key=b'foo', value=b'bar')
print("success")
在这里,send()发送消息100次。
KafkaConsumer.ipynb
KAFKA_BROKER = ['172.20.0.2:9093']
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',group_id='my-group',bootstrap_servers=KAFKA_BROKER)
print("Comm success")
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
在上面的使用者代码中,行
print("Comm success")
从不执行。基于生产者代码执行,网络是开放的,jupyter能够与kafka经纪人对话。但是,客户端无法连接到同一代理进行数据消耗。我该如何开始调试呢? 最佳答案
默认情况下,auto.offset.reset
的值为latest
,因此使用新的group.id将其设置为earliest
。
consumer = KafkaConsumer('my-topic',group_id='new-group',auto_offset_reset = 'earliest',bootstrap_servers=KAFKA_BROKER)