问题描述
目前我正在使用 Kafka/Zookeeper 和 pySpark (1.6.0).我已经成功创建了一个 kafka 消费者,它使用了 KafkaUtils.createDirectStream()
.
所有流媒体都没有问题,但我认识到,在我消费了一些消息后,我的 Kafka 主题没有更新到当前偏移量.
因为我们需要更新主题以在此处进行监控,所以这有点奇怪.
在 Spark 的文档中我发现了这个评论:
offsetRanges = []def storeOffsetRanges(rdd):全局偏移范围offsetRanges = rdd.offsetRanges()返回 rdddef printOffsetRanges(rdd):对于偏移范围中的 o:打印 "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)直接KafkaStream\.transform(storeOffsetRanges)\.foreachRDD(printOffsetRanges)
如果您希望基于 Zookeeper 的 Kafka 监控工具显示流应用程序的进度,您可以使用它来自己更新 Zookeeper.
我在 Scala 中找到了解决方案,但找不到 Python 的等价物.这是 Scala 示例:http:///geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
问题
但问题是,从那时起,我如何才能更新动物园管理员?
我写了一些函数来使用 python kazoo 图书馆.
获取 Kazoo 客户端单例的第一个函数:
ZOOKEEPER_SERVERS = "127.0.0.1:2181"def get_zookeeper_instance():从 kazoo.client 导入 KazooClient如果 'KazooSingletonInstance' 不在 globals() 中:globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)globals()['KazooSingletonInstance'].start()return globals()['KazooSingletonInstance']
然后函数读取和写入偏移量:
def read_offsets(zk, topic):从 pyspark.streaming.kafka 导入 TopicAndPartitionfrom_offsets = {}对于主题中的主题:对于 zk.get_children(f'/consumers/{topic}') 中的分区:topic_partion = TopicAndPartition(topic, int(partition))offset = int(zk.get(f'/consumers/{topic}/{partition}')[0])from_offsets[topic_partion] = 偏移返回 from_offsetsdef save_offsets(rdd):zk = get_zookeeper_instance()对于 rdd.offsetRanges() 中的偏移量:path = f"/consumers/{offset.topic}/{offset.partition}"zk.ensure_path(路径)zk.set(path, str(offset.untilOffset).encode())
然后在开始流式传输之前,您可以从 zookeeper 读取偏移量并将它们传递给 createDirectStream对于 fromOffsets
参数.:
from pyspark import SparkContext从 pyspark.streaming 导入 StreamingContext从 pyspark.streaming.kafka 导入 KafkaUtilsdef main(brokers="127.0.0.1:9092", 主题=['test1', 'test2']):sc = SparkContext(appName="PythonStreamingSaveOffsets")ssc = StreamingContext(sc, 2)zk = get_zookeeper_instance()from_offsets = read_offsets(zk,主题)directKafkaStream = KafkaUtils.createDirectStream(ssc,主题,{metadata.broker.list":经纪人},fromOffsets=from_offsets)directKafkaStream.foreachRDD(save_offsets)如果 __name__ == "__main__":主要的()
currently I'm working with Kafka / Zookeeper and pySpark (1.6.0).I have successfully created a kafka consumer, which is using the KafkaUtils.createDirectStream()
.
There is no problem with all the streaming, but I recognized, that my Kafka Topics are not updated to the current offset, after I have consumed some messages.
Since we need the topics updated to have a monitoring here in place this is somehow weird.
In the documentation of Spark I found this comment:
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
Here is the documentation:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
I found a solution in Scala, but I can't find an equivalent for python.Here is the Scala example: http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
Question
But the question is, how I'm able to update the zookeeper from that point on?
I write some functions to save and read Kafka offsets with python kazoo library.
First function to get singleton of Kazoo Client:
ZOOKEEPER_SERVERS = "127.0.0.1:2181"
def get_zookeeper_instance():
from kazoo.client import KazooClient
if 'KazooSingletonInstance' not in globals():
globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
globals()['KazooSingletonInstance'].start()
return globals()['KazooSingletonInstance']
Then functions to read and write offsets:
def read_offsets(zk, topics):
from pyspark.streaming.kafka import TopicAndPartition
from_offsets = {}
for topic in topics:
for partition in zk.get_children(f'/consumers/{topic}'):
topic_partion = TopicAndPartition(topic, int(partition))
offset = int(zk.get(f'/consumers/{topic}/{partition}')[0])
from_offsets[topic_partion] = offset
return from_offsets
def save_offsets(rdd):
zk = get_zookeeper_instance()
for offset in rdd.offsetRanges():
path = f"/consumers/{offset.topic}/{offset.partition}"
zk.ensure_path(path)
zk.set(path, str(offset.untilOffset).encode())
Then before starting streaming you could read offsets from zookeeper and pass them to createDirectStreamfor fromOffsets
argument.:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']):
sc = SparkContext(appName="PythonStreamingSaveOffsets")
ssc = StreamingContext(sc, 2)
zk = get_zookeeper_instance()
from_offsets = read_offsets(zk, topics)
directKafkaStream = KafkaUtils.createDirectStream(
ssc, topics, {"metadata.broker.list": brokers},
fromOffsets=from_offsets)
directKafkaStream.foreachRDD(save_offsets)
if __name__ == "__main__":
main()
这篇关于pySpark Kafka Direct Streaming 更新 Zookeeper/Kafka Offset的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!