先从源码来深入理解一下 DirectKafkaInputDStream 的将 kafka 作为输入流时,如何确保 exactly-once 语义。

val stream: InputDStream[(String, String, Long)] = KafkaUtils.createDirectStream
[String, String, StringDecoder, StringDecoder, (String, String, Long)](
ssc, kafkaParams, fromOffsets,
(mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message(), mmd.offset))

对应的源码如下:

def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag,
R: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
val cleanedHandler = ssc.sc.clean(messageHandler)
new DirectKafkaInputDStream[K, V, KD, VD, R](
ssc, kafkaParams, fromOffsets, cleanedHandler)
}

DirectKafkaInputDStream 的类声明如下:

A stream of org.apache.spark.streaming.kafka.KafkaRDD where each given Kafka topic/partition corresponds to an RDD partition. 
The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number of messages per second that
each partition will accept. Starting offsets are specified in advance, and this DStream is not responsible for committing offsets,
so that you can control exactly-once semantics. For an easy interface to Kafka-managed offsets,
see org.apache.spark.streaming.kafka.KafkaCluster

简言之,Kafka RDD 的一个流,每一个指定的topic 的每一个 partition 对应一个 RDD partition

在父类 InputDStream 中,对 compute 方法的解释如下:

Method that generates a RDD for the given time
对于给定的时间,生成新的Rdd

这就是生成RDD 的入口:

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
// 1. 先获取这批次数据的 until offsets
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
// 2. 生成KafkaRDD 实例
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker.
// 获取 该批次 的 offset 的范围
val offsetRanges = currentOffsets.map { case (tp, fo) =>
val uo = untilOffsets(tp) // 获取 until offset
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
}
//3. 将当前批次的metadata和offset 的信息报告给 InputInfoTracker
val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// 4. 更新当前的 offsets
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}

获取这批次数据的 until offsets

详细分析 获取 leaderOffset 的步骤,即 latestLeaderOffsets 方法:

@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
// Either.fold would confuse @tailrec, do it manually
if (o.isLeft) { // left 代表 error
val err = o.left.get.toString
if (retries <= 0) {
throw new SparkException(err)
} else {
log.error(err)
Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)
}
} else { // right 代表结果
o.right.get
}
}

分析 kc.getLatestLeaderOffsets(currentOffsets.keySet)
字段赋值语句:protected val kc = new KafkaCluster(kafkaParams)
即调用了 KafkaCluster的getLatestLeaderOffsets
调用栈如下:

def getLatestLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
// 调用了下面的方法:
def getLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition],
before: Long
): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
r.map { kv =>
// mapValues isnt serializable, see SI-7005
kv._1 -> kv._2.head
}
}
}
// getLeaderOffsets 调用了下面的方法,用于获取leader 的offset,现在是最大的offset:
def getLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition],
before: Long,
maxNumOffsets: Int
): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
// 获取所有的partition 的leader的 host和 port 信息
findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
// tp -> (l.host -> l.port) ==> (l.host -> l.port) ->seq[tp]
val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
// 所有的leader 的 连接方式
val leaders = leaderToTp.keys
var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
val errs = new Err
// 通过leader 获取每一个 leader的offset,现在是最大的 offset
withBrokers(leaders, errs) { consumer =>
val partitionsToGetOffsets: Seq[TopicAndPartition] =
leaderToTp((consumer.host, consumer.port))
val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
}.toMap
val req = OffsetRequest(reqMap)
val resp = consumer.getOffsetsBefore(req)
val respMap = resp.partitionErrorAndOffsets
partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
if (por.error == ErrorMapping.NoError) {
if (por.offsets.nonEmpty) {
result += tp -> por.offsets.map { off =>
LeaderOffset(consumer.host, consumer.port, off)
}
} else {
errs.append(new SparkException(
s"Empty offsets for ${tp}, is ${before} before log beginning?"))
}
} else {
errs.append(ErrorMapping.exceptionFor(por.error))
}
}
}
if (result.keys.size == topicAndPartitions.size) {
return Right(result)
}
}
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}"))
Left(errs)
}
}
// 根据 TopicAndPartition 获取partition leader 的 host 和 port 信息
def findLeaders(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
val topics = topicAndPartitions.map(_.topic)
// 获取给定topics集合的所有的partition 的 metadata信息
val response = getPartitionMetadata(topics).right
// 获取所有的partition 的 leader 的 host 和port 信息
val answer = response.flatMap { tms: Set[TopicMetadata] =>
val leaderMap = tms.flatMap { tm: TopicMetadata =>
tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
val tp = TopicAndPartition(tm.topic, pm.partitionId)
if (topicAndPartitions(tp)) {
pm.leader.map { l =>
tp -> (l.host -> l.port)
}
} else {
None
}
}
}.toMap if (leaderMap.keys.size == topicAndPartitions.size) {
Right(leaderMap)
} else {
val missing = topicAndPartitions.diff(leaderMap.keySet)
val err = new Err
err.append(new SparkException(s"Couldn't find leaders for ${missing}"))
Left(err)
}
}
answer
}
// 获取给定的 topic集合的所有partition 的metadata 信息
def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
// 创建TopicMetadataRequest对象
val req = TopicMetadataRequest(
TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
val errs = new Err
// 随机打乱 broker-list的顺序
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp: TopicMetadataResponse = consumer.send(req)
val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError) if (respErrs.isEmpty) {
return Right(resp.topicsMetadata.toSet)
} else {
respErrs.foreach { m =>
val cause = ErrorMapping.exceptionFor(m.errorCode)
val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
errs.append(new SparkException(msg, cause))
}
}
}
Left(errs)
}
// Try a call against potentially multiple brokers, accumulating errors
private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
(fn: SimpleConsumer => Any): Unit = {
//这里虽然是一个 foreach循环,但一旦获取到metadata,就返回,之所以使用一个foreach循环,是为了增加重试次数,
// 防止kafka cluster 的单节点宕机,除此之外,还设计了 单节点的多次重试机制。只不过是循环重试,即多个节点都访问完后,
// 再sleep 200ms(默认),然后再进行下一轮访问,可以适用于节点瞬间服务不可用情况。
brokers.foreach { hp =>
var consumer: SimpleConsumer = null
try {
// 获取SimpleConsumer 的连接
consumer = connect(hp._1, hp._2)
fn(consumer) // 发送请求并获取到partition 的metadata
/* fn 即 后面定义的
consumer =>
val resp: TopicMetadataResponse = consumer.send(req)
val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError) if (respErrs.isEmpty) {
return Right(resp.topicsMetadata.toSet)
} else {
respErrs.foreach { m =>
val cause = ErrorMapping.exceptionFor(m.errorCode)
val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
errs.append(new SparkException(msg, cause))
}
}
}
Left(errs)
*/
} catch {
case NonFatal(e) =>
errs.append(e)
} finally {
if (consumer != null) {
consumer.close()
}
}
}
} private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
m.groupBy(_._2).map { kv =>
kv._1 -> kv._2.keys.toSeq
}

然后,根据获取的 每一个 partition的leader 最大 offset 来,确定每一个partition的 until offset,即clamp 函数的功能:

// limits the maximum number of messages per partition
protected def clamp(
leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
maxMessagesPerPartition.map { mmp =>
leaderOffsets.map { case (tp, lo) =>
// 评估的until offset = 当前offset + 评估速率
// 从 每一个topic partition leader 的最大offset 和 评估的 until offset 中选取较小值作为 每一个 topic partition 的 until offset
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
}
}.getOrElse(leaderOffsets) // 如果是第一次获取数据,并且没有设置spark.streaming.kafka.maxRatePerPartition 参数,则会返回 每一个 leader 的最大大小
} protected def maxMessagesPerPartition: Option[Long] = {
// rateController 是负责评估流速的
val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
// 所有的 topic 分区数
val numPartitions = currentOffsets.keys.size
// 获取当前的流处理速率
val effectiveRateLimitPerPartition = estimatedRateLimit
.filter(_ > 0) // 过滤掉非正速率
.map { limit =>
// 通过spark.streaming.kafka.maxRatePerPartition设置这个参数,默认是0
if (maxRateLimitPerPartition > 0) {
// 从评估速率和设置的速率中取一个较小值
Math.min(maxRateLimitPerPartition, (limit / numPartitions))
} else { // 如果没有设置,评估速率 / 分区数
limit / numPartitions
}
}.getOrElse(maxRateLimitPerPartition) // 如果速率评估率不起作用时,使用设置的速率,如果不设置是 0 if (effectiveRateLimitPerPartition > 0) { // 如果每一个分区的有效速率大于0
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
// 转换成每ms的流速率
Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
} else {
None
}
}

生成KafkaRDD

KafkaRDD 伴生对象的 apply 方法:

def apply[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag,
R: ClassTag](
sc: SparkContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
untilOffsets: Map[TopicAndPartition, LeaderOffset],
messageHandler: MessageAndMetadata[K, V] => R
): KafkaRDD[K, V, U, T, R] = {
// 从 untilOffsets 中获取 TopicAndPartition 和 leader info( host, port) 的映射关系
val leaders = untilOffsets.map { case (tp, lo) =>
tp -> (lo.host, lo.port)
}.toMap val offsetRanges = fromOffsets.map { case (tp, fo) =>
// 根据 fromOffsets 和 untilOffset ,拼接成OffsetRange 对象
val uo = untilOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
}.toArray
// 返回 KafkaRDD class 的实例
new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}

先看KafkaRDD 的解释:

A batch-oriented interface for consuming from Kafka.
Starting and ending offsets are specified in advance,
so that you can control exactly-once semantics.
从kafka 消费的针对批处理的API,开始和结束 的 offset 都提前设定了,所以我们可以控制exactly-once 的语义。

重点看 KafkaRDD 的 compute 方法,它以分区作为参数:

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) { // 如果 from offset == until offset,返回一个空的迭代器对象
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part, context)
}
}

KafkaRDDIterator的源码如下,首先这个类比较好理解,因为只重写了两个非private 方法,close和 getNext, close 是用于关闭 SimpleConsumer 实例的(主要用于关闭socket 连接和 用于读response和写request的blockingChannel),getNext 是用于获取数据的

spark streaming 接收kafka消息之二 -- 运行在driver端的receiver-LMLPHP

类源码如下:

private class KafkaRDDIterator(
part: KafkaRDDPartition,
context: TaskContext) extends NextIterator[R] { context.addTaskCompletionListener{ context => closeIfNeeded() } log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
// KafkaCluster 是与 kafka cluster通信的client API
val kc = new KafkaCluster(kafkaParams)
// kafka 消息的 key 的解码器
// classTag 是scala package 下的 package object – reflect定义的一个classTag方法,该方法返回一个 ClassTag 对象,
// 该对象中 runtimeClass 保存了运行时被擦除的范型Class对象, Decoder 的实现类都有一个 以VerifiableProperties
// 变量作为入参的构造方法。获取到构造方法后,利用反射实例化具体的Decoder实现对象,然后再向上转型为 Decoder
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[K]]
// kafka 消息的 value 的解码器
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[V]]
val consumer = connectLeader
var requestOffset = part.fromOffset
var iter: Iterator[MessageAndOffset] = null // The idea is to use the provided preferred host, except on task retry atttempts,
// to minimize number of kafka metadata requests
private def connectLeader: SimpleConsumer = {
if (context.attemptNumber > 0) {
// 如果重试次数大于 0, 则允许重试访问--bootstrap-server 列表里的所有 broker,一旦获取到 topic 的partition 的leader 信息,则马上返回
kc.connectLeader(part.topic, part.partition).fold(
errs => throw new SparkException(
s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
errs.mkString("\n")),
consumer => consumer
)
} else {
kc.connect(part.host, part.port)
}
}
// 在fetch数据失败时所做的操作,无疑,这是一个hook 函数
private def handleFetchErr(resp: FetchResponse) {
if (resp.hasError) {
val err = resp.errorCode(part.topic, part.partition)
if (err == ErrorMapping.LeaderNotAvailableCode ||
err == ErrorMapping.NotLeaderForPartitionCode) {
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
Thread.sleep(kc.config.refreshLeaderBackoffMs)
}
// Let normal rdd retry sort out reconnect attempts
throw ErrorMapping.exceptionFor(err)
}
}
//注意此时的 返回结果是MessageAndOffset(Message(ByteBuffer)和 offset) 的迭代器
private def fetchBatch: Iterator[MessageAndOffset] = {
// 首先,见名之意,这是一个builder,作用就是构建一个FetchRequest 对象
val req = new FetchRequestBuilder()
.addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
.build()
// 调用 SimpleConsumer 的 fetch 方法,发送 FetchRequest 请求并获取返回的 topic 消息
val resp = consumer.fetch(req)
// 查看是否有错误,如果有,则抛出一场,否则继续处理返回的消息
handleFetchErr(resp)
// kafka may return a batch that starts before the requested offset
// 因为网络延迟等原因,可能会获取到之前的发送的请求结果,此时的 offset 是小于当前的 offset 的,需要过滤掉
resp.messageSet(part.topic, part.partition)
.iterator
.dropWhile(_.offset < requestOffset)
} override def close(): Unit = {
if (consumer != null) {
consumer.close()
}
}
// 我们重点看getNext 方法, 它的返回值 为R, 从KafkaUtils类中的初始化KafkaRDD 方法可以看出 R 其实是 <K,V>, 即会返回一个key 和 value的pair
override def getNext(): R = {
if (iter == null || !iter.hasNext) { // 第一次或者是已经消费完了
iter = fetchBatch // 调用 fetchBatch 方法,获取得到MessageAndOffset的迭代器
}
if (!iter.hasNext) { // 如果本批次没有数据需要处理或者本批次内还有所有数据均被处理,直接修改标识位,返回null
assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
finished = true
null.asInstanceOf[R]
} else {
val item = iter.next() // 获取下一个 MessageAndOffset 对象
if (item.offset >= part.untilOffset) { // 如果返回的消息大于等于本批次的until offset,则会返回 null
assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
finished = true
null.asInstanceOf[R]
} else { // 获取的 MessageAndOffse的Offset 大于等于 from offset并且小于 until offset
requestOffset = item.nextOffset // 需要请求 kafka cluster 的消息是本条消息的下一个offset对应的消息
// MessageAndMetadata 是封装了单条消息的相关信息,包括 topic, partition, 对应的消息ByteBuffer,消息的offset,key解码器,value解码类
// messageHandler 是一个回调方法, 对应了本例中的(mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message(), mmd.offset)代码
messageHandler(new MessageAndMetadata(
part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
}
}
}
}

总结

有如下问题:
1.这个类是如何接收 kafka 的消息的?
通过KafkaRDD来获取单批次的数据的,KafkaRDD的compute方法返回一个迭代器,这个迭代器封装了kafka partition数据的批量抓取以及负责调用传入的消息处理回调函数并将单条处理结果返回。
其中,spark streaming 的exactly-once 消费机制是通过 KafkaRDD 来保证的,在创建KafkaRDD之前,就已经通过 currentOffset和 估算出的速率,以及每个分区的自定义最大抓取速率,和从partition的leader获取的最大offset,确定分区untilOffset的值,最终fromOffset和untilOffset构成OffsetRange,在KafkaRDD中生成的迭代器中会丢弃掉offset不在该OffsetRange内的数据,最终调用用户传入的消息处理函数,处理数据成用户想要的数据格式。
2.这个类是如何将单个partition的消息转换为 RDD单个partition的数据的?
KafkaRDD 的compute 方法 以 partition 作为参数,这个partition是 KafkaRDDPartition 的实例, 包含了分区消息的 offset range,topic, partition 等信息,该方法会返回一个KafkaRDDIterat,该类提供了访问 该分区内kafka 数据的 数据,内部通过SimpleConsumer 来从leader 节点来批量获取数据,然后再从批量数据中获取我们想要的数据(由offset range来保证)。
3.这个类是如何估算 kafka 消费速率的?
提供了 PIDRateEstimator 类, 该类通过传入batch 处理结束时间,batch 处理条数, 实际处理时间和 batch 调度时间来估算速率的。
4.这个类是如何做WAL 的?这个类做不了 WAL
05-18 13:09