在生成带有reactive kafkaavro4s的Avro消息时,出现了可重现的错误。一旦达到客户端的identityMapCapacity(CachedSchemaRegistryClient),序列化将失败并显示

java.lang.IllegalStateException: Too many schema objects created for <myTopic>-value

这是意外的,因为所有消息都应具有相同的架构-它们是相同case类的序列化。
val avroProducerSettings: ProducerSettings[String, GenericRecord] =
  ProducerSettings(system, Serdes.String().serializer(),
  avroSerde.serializer())
 .withBootstrapServers(settings.bootstrapServer)

val avroProdFlow: Flow[ProducerMessage.Message[String, GenericRecord, String],
                    ProducerMessage.Result[String, GenericRecord, String],
                    NotUsed] = Producer.flow(avroProducerSettings)

val avroQueue: SourceQueueWithComplete[Message[String, GenericRecord, String]] =
  Source.queue(bufferSize, overflowStrategy)
  .via(avroProdFlow)
  .map(logResult)
  .to(Sink.ignore)
  .run()

...
queue.offer(msg)

序列化器是一个KafkaAvroSerializer,用new CachedSchemaRegistryClient(settings.schemaRegistry, 1000)实例化

生成GenericRecord:
def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord =
  recordFormat.to(a)

val makeEdgeMessage: (Edge, String) => Message[String, GenericRecord, String] = { (edge, topic) =>
  val edgeAvro: GenericRecord = toAvro(edge)
  val record   = new ProducerRecord[String, GenericRecord](topic, edge.id, edgeAvro)
  ProducerMessage.Message(record, edge.id)
}

该模式是在代码(io.confluent.kafka.serializers.AbstractKafkaAvroSerDe#getSchema,由io.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl调用)的深处创建的,对此我没有任何影响,因此我不知道如何解决泄漏。在我看来,这两个融合的项目不能很好地协同工作。

我发现hereherehere的问题似乎无法解决我的用例。

对我来说,目前有两种解决方法:
  • 不使用架构注册表-显然不是一个长期解决方案
  • 创建不依赖于对象标识的自定义SchemaRegistryClient-可行,但我想避免产生比重新实现
  • 更大的问题

    有没有一种方法可以根据消息/记录类型生成或缓存一致的架构,并在我的设置中使用它?

    最佳答案

    编辑2017.11.20

    在我的情况下,问题是,携带我的消息的GenericRecord的每个实例都已由包含RecordFormat的不同实例的Schema的不同实例进行了序列化。这里的隐式解析每次都会生成一个新实例。
    def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord = recordFormat.to(a)
    解决方案是将RecordFormat实例固定到val并显式重用。非常感谢https://github.com/heliocentristexplaining the details

    原始回复:

    等待了一会儿(对于github issue也没有答案)之后,我不得不实现自己的SchemaRegistryClient。超过90%的内容是从原始CachedSchemaRegistryClient复制而来的,只是被翻译成scala。使用scala mutable.Map修复了内存泄漏。我尚未进行任何全面的测试,因此使用时需您自担风险。

    import java.util
    
    import io.confluent.kafka.schemaregistry.client.rest.entities.{ Config, SchemaString }
    import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest
    import io.confluent.kafka.schemaregistry.client.rest.{ RestService, entities }
    import io.confluent.kafka.schemaregistry.client.{ SchemaMetadata, SchemaRegistryClient }
    import org.apache.avro.Schema
    
    import scala.collection.mutable
    
    class CachingSchemaRegistryClient(val restService: RestService, val identityMapCapacity: Int)
        extends SchemaRegistryClient {
    
      val schemaCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()
      val idCache: mutable.Map[String, mutable.Map[Integer, Schema]] =
        mutable.Map(null.asInstanceOf[String] -> mutable.Map())
      val versionCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()
    
      def this(baseUrl: String, identityMapCapacity: Int) {
        this(new RestService(baseUrl), identityMapCapacity)
      }
    
      def this(baseUrls: util.List[String], identityMapCapacity: Int) {
        this(new RestService(baseUrls), identityMapCapacity)
      }
    
      def registerAndGetId(subject: String, schema: Schema): Int =
        restService.registerSchema(schema.toString, subject)
    
      def getSchemaByIdFromRegistry(id: Int): Schema = {
        val restSchema: SchemaString = restService.getId(id)
        (new Schema.Parser).parse(restSchema.getSchemaString)
      }
    
      def getVersionFromRegistry(subject: String, schema: Schema): Int = {
        val response: entities.Schema = restService.lookUpSubjectVersion(schema.toString, subject)
        response.getVersion.intValue
      }
    
      override def getVersion(subject: String, schema: Schema): Int = synchronized {
        val schemaVersionMap: mutable.Map[Schema, Integer] =
          versionCache.getOrElseUpdate(subject, mutable.Map())
    
        val version: Integer = schemaVersionMap.getOrElse(
          schema, {
            if (schemaVersionMap.size >= identityMapCapacity) {
              throw new IllegalStateException(s"Too many schema objects created for $subject!")
            }
    
            val version = new Integer(getVersionFromRegistry(subject, schema))
            schemaVersionMap.put(schema, version)
            version
          }
        )
        version.intValue()
      }
    
      override def getAllSubjects: util.List[String] = restService.getAllSubjects()
    
      override def getByID(id: Int): Schema = synchronized { getBySubjectAndID(null, id) }
    
      override def getBySubjectAndID(subject: String, id: Int): Schema = synchronized {
        val idSchemaMap: mutable.Map[Integer, Schema] = idCache.getOrElseUpdate(subject, mutable.Map())
        idSchemaMap.getOrElseUpdate(id, getSchemaByIdFromRegistry(id))
      }
    
      override def getSchemaMetadata(subject: String, version: Int): SchemaMetadata = {
        val response = restService.getVersion(subject, version)
        val id       = response.getId.intValue
        val schema   = response.getSchema
        new SchemaMetadata(id, version, schema)
      }
    
      override def getLatestSchemaMetadata(subject: String): SchemaMetadata = synchronized {
        val response = restService.getLatestVersion(subject)
        val id       = response.getId.intValue
        val version  = response.getVersion.intValue
        val schema   = response.getSchema
        new SchemaMetadata(id, version, schema)
      }
    
      override def updateCompatibility(subject: String, compatibility: String): String = {
        val response: ConfigUpdateRequest = restService.updateCompatibility(compatibility, subject)
        response.getCompatibilityLevel
      }
    
      override def getCompatibility(subject: String): String = {
        val response: Config = restService.getConfig(subject)
        response.getCompatibilityLevel
      }
    
      override def testCompatibility(subject: String, schema: Schema): Boolean =
        restService.testCompatibility(schema.toString(), subject, "latest")
    
      override def register(subject: String, schema: Schema): Int = synchronized {
        val schemaIdMap: mutable.Map[Schema, Integer] =
          schemaCache.getOrElseUpdate(subject, mutable.Map())
    
        val id = schemaIdMap.getOrElse(
          schema, {
            if (schemaIdMap.size >= identityMapCapacity)
              throw new IllegalStateException(s"Too many schema objects created for $subject!")
            val id: Integer = new Integer(registerAndGetId(subject, schema))
            schemaIdMap.put(schema, id)
            idCache(null).put(id, schema)
            id
          }
        )
        id.intValue()
      }
    }
    

    10-08 03:35