本文介绍了将 Spark SQL 与 Spark Streaming 结合使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尝试在 Spark 结构化流方面理解 SparkSql.
Spark Session 从 kafka 主题读取事件,将数据聚合到按不同列名分组的计数并将其打印到控制台.
原始输入数据结构如下:

Trying make sense of SparkSql with respect to Spark Structured Streaming.
Spark Session reads events from a kafka topic, aggregates data to counts grouped by different column names and prints it to the console.
Raw input data structured like this:

+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
|.  sourceTypes|                Guid|  platform|datacenter|pagesId|     eventTimestamp|              Id1234|  Id567890|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
| Notififcation|....................|   ANDROID|       dev|     aa|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................|   ANDROID|       dev|     ab|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:46|0ee089c1-d5da-3b3...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|57c18964-40c9-311...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:52|d9fc4cfa-0934-3e9...|{"id":...|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+---------+

sourceTypesplatformdatacenterpageId 需要计数.

Counts are required for sourceTypes, platform, datacenter and pageId.

使用以下代码聚合数据:

Aggregating data with following code:

Dataset<Row> query = sourceDataset
        .withWatermark("eventTimestamp", watermarkInterval)
        .select(
            col("eventTimestamp"),
            col("datacenter"),
            col("platform"),
            col("pageId")
        )
        .groupBy(
            window(col("eventTimestamp"), windowInterval),
            col("datacenter"),
            col("platform"),
            col("pageId")
        )
        .agg(
            max(col("eventTimestamp"))
        );

这里watermarkInterval=45seconds, windowInterval=15seconds &triggerInterval=15seconds.

Here watermarkInterval=45seconds, windowInterval=15seconds & triggerInterval=15seconds.

使用新的聚合数据集:

aggregatedDataset
        .writeStream()
        .outputMode(OutputMode.Append())
        .format("console")
        .trigger(Trigger.ProcessingTime(triggerInterval))
        .start();

有几个问题:

  1. 输出数据不会打印每个 groupBy 的计数,如平台、pageId 等.

  1. Output data is not printing the counts for each groupBy like platform, pageId etc.

如何以json格式打印输出?我尝试在控制台上输出数据时使用 select(to_json(struct("*")).as("value")) 但它不起作用.

How to print the output in json format? I tried using select(to_json(struct("*")).as("value")) while outputting data on console but it doesn't work.

推荐答案

您可以使用以下代码段解决您的问题:

You can solve your problem using the following code snippet:

.outputMode("complete")

这篇关于将 Spark SQL 与 Spark Streaming 结合使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:15