本文介绍了在 Python 中处理从 Kafka 主题传入的 json 表格数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有事件以 key:value jsons(没有嵌套结构)的形式流入多个 Kafka 主题,例如:

I have events streaming into multiple Kafka topics in the form of key:value jsons (without nested structure) for example:

event_1: {"name": "Alex", "age": 27, "hobby": "pc games"},
event_2: {"name": "Bob", "age": 33, "hobby: "swimming"},
event_3: {"name": "Charlie", "age": 12, "hobby: "collecting stamps"}

我在 Python 3.7 中工作,并希望使用来自这些主题的一批事件,比方说,每 5 分钟,将其转换为数据帧,对这些数据进行一些处理和丰富,并将结果保存到 csv文件.

I am working in Python 3.7, and wish to consume a batch of events from those topics, let's say, every 5 minutes, transform it into a dataframe, do some processing and enrichment with this data and save the result to a csv file.

我是 Spark 的新手,我搜索了文档来帮助我完成这项任务,但没有找到任何文档.有没有推荐的更新信息源?
此外,如果有任何其他推荐的适合此任务的大数据框架,我很乐意听到.

I'm new to Spark and searched for documentation to help me with this task but did not find any.Is there any updated source of information recommended?
Also, if there is any other recommended Big Data framework that would suit this task, I'd love to hear about it.

推荐答案

参考:triggers 部分结构化流编程指南.有3种不同的触发器类型,默认为micro-batch,在上一个micro-batch处理完成后会立即生成micro-batch.

Refer: triggers section of Structured Streaming Programming Guide. There are 3 different types of trigger, with default as micro-batch, where micro-batches will be generated as soon as the previous micro-batch has completed processing.

如果您需要固定间隔微批次,您可以在其中指定必须触发查询的持续时间.以下是执行此操作的代码片段.

In you case you need Fixed interval micro-batches where you can specify the duration on which the query has to be triggered. Following is the code snippet to do that.

df.writeStream \
    .format("csv") \
    .option("header", True) \
    .option("path", "path/to/destination/dir") \
    .trigger(processingTime='5 minutes') \ # fixed interval trigger
    .start()

简要代码

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType


# Define schema of kafak message

schema = StructType([
    StructField("name", StringType, true),
    StructField("age", IntegerType, true),
    StructField("hobby", StringType, true),
])

# Initialize spark session

spark = SparkSession.builder.appName("example").getOrCreate()

# Read Kafka topic and load data using schema

df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers","x.x.x.x:2181")\
    .option("startingOffsets", "latest")\
    .option("subscribe","testdata")\
    .load()\
    .select(from_json(col("value").cast("string"), schema).alias("data"))\
    .select(f.col("data.*"))\

# Do some transformation
df1 = df...

# Write the resultant dataframe as CSV file

df1.writeStream \
    .format("csv") \
    .option("header", True) \
    .option("path", "path/to/destination/dir") \
    .trigger(processingTime='5 minutes') \
    .start()

如果需要,您还可以在写入 csv 文件之前对最终数据帧进行重新分区

You can also repartition the final dataframe before writing as csv file if needed

这篇关于在 Python 中处理从 Kafka 主题传入的 json 表格数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-14 14:50