本文介绍了在正在运行的Spark Streaming应用程序中处理架构更改的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找在Spark 1.6上使用DataFrames API构建一个Spark Streaming应用程序.在深入研究之前,我希望有人能帮助我理解DataFrames如何处理具有不同架构的数据.

I am looking to build a Spark Streaming application using the DataFrames API on Spark 1.6. Before I get too far down the rabbit hole, I was hoping someone could help me understand how DataFrames deals with data having a different schema.

这个想法是消息将通过Avro模式流入Kafka.我们应该能够以向后兼容的方式来发展模式,而不必重启流应用程序(应用程序逻辑仍将起作用).

The idea is that messages will flow into Kafka with an Avro schema. We should be able to evolve the schema in backwards compatible ways without having to restart the streaming application (the application logic will still work).

使用模式注册表反序列化新版本的消息以及使用KafkaUtils创建直接流和AvroKafkaDecoder(来自Confluent)将消息中嵌入的模式ID进行反序列化似乎很简单.这让我远不止拥有DStream.

It appears trivial to deserialize new versions of messages using a schema registry and the schema id embedded in the message using the KafkaUtils to create a direct stream and the AvroKafkaDecoder (from Confluent). That gets me as far as having a DStream.

问题1:在该DStream中,将存在具有不同版本架构的对象.因此,当我将每一个都转换为Row对象时,我应该传递一个最新的阅读器模式以正确迁移数据,并且需要将最新的模式传递给sqlContext.createDataFrame(rowRdd,schema)调用. DStream中的对象的类型为GenericData.Record,据我所知,没有简单的方法可以知道哪个是最新版本.我看到了两种可能的解决方案,一种是调用架构注册表以在每个微批处理中获取架构的最新版本.另一种是修改解码器以附加模式ID.然后,我可以遍历rdd来找到最高的id并从本地缓存中获取架构.

Problem #1:Within that DStream there will be objects with different versions of the schema. So when I translate each one into a Row object I should be passing in a reader schema that is the latest one to properly migrate the data, and I need to pass the latest schema into the sqlContext.createDataFrame(rowRdd, schema) call. The objects in the DStream are of type GenericData.Record, and as far as I can tell there is no easy way to tell which is the most recent version. I see 2 possible solutions, one is to call the schema registry to get the latest version of the schema on every microbatch. The other is to modify the decoder to attach the schema id. I could then iterate over the rdd to find the highest id and get the schema from a local cache.

我希望有人已经以一种可重用的方式很好地解决了这个问题.

I was hoping someone had already solved this nicely in a reusable way.

问题/问题2:对于每个分区,Spark将从Kafka提取不同的执行程序.当一个执行者收到与其他执行者不同的最新"架构时,我的应用程序会发生什么.一个执行者创建的DataFrame在相同的时间范围内将具有与另一个执行者不同的模式.我实际上不知道这是否是一个真正的问题.我在可视化数据流时遇到麻烦,哪种操作会带来问题.如果存在问题,则意味着执行者之间需要进行一些数据共享,这听起来既复杂又效率低下.

Problem/Question #2:Spark is going to have a different executor pulling from Kafka for each partition. What happens to my application when one executor receives a different "latest" schema than the others. The DataFrame created by one executor will have a different schema than another for the same time window. I don't actually know if this is a real problem or not. I am having trouble visualizing the flow of data, and what kinds of operations would present problems. If it is a problem, it would imply that there needs to be some data sharing between executors and that sounds both complicated and inefficient.

我需要为此担心吗?如果可以,该如何解决架构差异?

Do I need to worry about this? If I do, how to I resolve the schema differences?

谢谢,-本

推荐答案

我相信我已经解决了这个问题.我正在使用Confluent的架构注册表和KafkaAvroDecoder.简化的代码如下:

I believe I have resolved this. I am using Confluent's schema registry and KafkaAvroDecoder. Simplified code looks like:

// Get the latest schema here. This schema will be used inside the
// closure below to ensure that all executors are using the same
// version for this time slice.
val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000)
val m = sr.getLatestSchemaMetadata(subject)
val schemaId = m.getId
val schemaString = m.getSchema

val outRdd = rdd.mapPartitions(partitions => {
  // Note: we cannot use the schema registry from above because this code
  // will execute on remote machines, requiring the schema registry to be
  // serialized. We could use a pool of these.
  val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000)
  val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry)
  val parser = new Schema.Parser()
  val avroSchema = parser.parse(schemaString)
  val avroRecordConverter = AvroSchemaConverter.createConverterToSQL(avroSchema)

  partitions.map(input => {
    // Decode the message using the latest version of the schema.
    // This will apply Avro's standard schema evolution rules
    // (for compatible schemas) to migrate the message to the
    // latest version of the schema.
    val record = decoder.fromBytes(messageBytes, avroSchema).asInstanceOf[GenericData.Record]
    // Convert record into a DataFrame with columns according to the schema
    avroRecordConverter(record).asInstanceOf[Row]
  })
})

// Get a Spark StructType representation of the schema to apply
// to the DataFrame.
val sparkSchema = AvroSchemaConverter.toSqlType(
      new Schema.Parser().parse(schemaString)
    ).dataType.asInstanceOf[StructType]
sqlContext.createDataFrame(outRdd, sparkSchema)

这篇关于在正在运行的Spark Streaming应用程序中处理架构更改的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 14:42