目录

1. 链接

2. 从Kafka读数据

2.1 从流查询创建Kafka数据源

2.2 从批查询Kafka数据源(spark.readStream变成了spark.read)

3. 向Kafka写数据

3.1 创建流查询Kafka Sink

3.2 创建批查询Kafka Sink

4 Kafka 特有参数配置


1. 链接

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.4.0

2. 从Kafka读数据

2.1 从流查询创建Kafka数据源

// 订阅一个主题
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "idayan00:9092,idayan01:9092,idayan02:9092")
  .option("subscribe", "topicA")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

//  订阅多个主题
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "idayan00:9092,idayan01:9092,idayan02:9092")
  .option("subscribe", "topicA,topicA")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 订阅通过正则匹配的主题
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "idayan00:9092,idayan01:9092,idayan02:9092")
  .option("subscribePattern", "topic*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

2.2 从批查询Kafka数据源(spark.readStream变成了spark.read)

// 订阅一个主题, 默认使用最早、最新的偏移量
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:9092,host2:9092")
  .option("subscribe", "topicA")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 订阅多个主题,, 指定明确的 Kafka 偏移量
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:9092,host2:9092")
  .option("subscribe", "topicA,topicA")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 订阅通过正则匹配的主题, 使用最早、最新的偏移量
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

数据源中各字段模式:

Kafka source必需的参数如下:

下述配置是可选的:

3. 向Kafka写数据

这里我们讨论如何将流查询和批查询结果写出到Apache Kafka.需要注意的是Apache Kafka只支持最少一次的写算法。所以, 当写到Kafka时(不管是流查询还是批查询),有些消息可能会重复。这是可能发生的,例如,kafka需要重试没有被Broker确认的信息时,即使Broker已经接收到并且写入了这个消息。因为kafka的这个写算法,Structured Streaming 不能保证这样的重复不发生。不过,如果写操作成功,你可以认为这个查询结果最少被写出了一次。一个在读取这些数据时去除重的方案是引入一个唯一key。

    写出到Kafka的DataFrame应该有下列结构:

* 如果主题的配置选项没有指定,那么topic这一列是必须的。

    value 列是唯一一个必须的。如果key 列没有指定,会默认指定为null。如果主题列不存在,那么他的值会被用作主题写给kafka,除非设置了主题配置。主题配置会覆盖主题列。

Kafka sink 必须配置下列选项:

下述参数是可选的:

3.1 创建流查询Kafka Sink

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

3.2 创建批查询Kafka Sink

/ Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

4 Kafka 特有参数配置

Kafka特有的配置可以通过DataStreamReader.option的kafka.前缀配置。例如 stream.option("kafka.bootstrap.servers", "host:port"),kafka可用的参数可以参见kafka生产者/消费者参数配置文档。
    请注意,下述参数不可以设置,否则kafka source 或sink将会抛出异常:

  • group.id: Kafka Source会为每个查询创建一个唯一的group id.
  • auto.offset.reset: 设为source选项 startingOffsets为指定offset.
  • key.deserializer: key总是使用ByteArrayDeserializer反序列化为字节数组。
  • value.deserializer:value总是使用ByteArrayDeserializer反序列化为字节数组。
  • key.serializer: key总是使用ByteArraySerializer 或StringSerializer序列化。
  • value.serializer:value 总是使用ByteArraySerializer 或StringSerializer序列化。
  • enable.auto.commit: Kafka source 不提交任何offset
  • interceptor.classes: kafka source将values读取做字节数组。使用ConsumerInterceptor可能会破坏查询,因此是不安全的。
12-21 21:33