我目前正在使用Zeebe工作流执行引擎,并且正在尝试正确地将其中一个导出器从使用TOML配置文件升级到YAML文件。这是原始的TOML:

[[exporters]]
id = "kafka"
className = "io.zeebe.exporters.kafka.KafkaExporter"

# Top level exporter arguments
[exporters.args]
# Controls how many records can have been sent to the Kafka broker without
# any acknowledgment Once the limit is reached the exporter will block and
# wait until either one record is acknowledged
maxInFlightRecords = 1000
# How often should the exporter drain the in flight records' queue of completed
# requests and update the broker with the guaranteed latest exported position
inFlightRecordCheckIntervalMs = 1000

# Producer specific configuration
[exporters.args.producer]
# The list of initial Kafka broker contact points. The format should be the same
# one as the ProducerConfig expects, i.e. "host:port"
# Maps to ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
servers = [ "kafka:9092" ]
# Controls how long the producer will wait for a request to be acknowledged by
# the Kafka broker before retrying it
# Maps to ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
requestTimeoutMs = 5000
# Grace period when shutting down the producer in milliseconds
closeTimeoutMs = 5000
# Producer client identifier
clientId = "zeebe"
# Max concurrent requests to the Kafka broker; note that in flight records are batched such that
# in one request you can easily have a thousand records, depending on the producer's batch
# configuration.
maxConcurrentRequests = 3

# Any setting under the following section will be passed verbatim to
# ProducerConfig; you can use this to configure authentication, compression,
# etc. Note that you can overwrite some important settings, so avoid changing
# idempotency, delivery timeout, and retries, unless you know what you're doing
[exporters.args.producer.config]

# Controls which records are pushed to Kafka and to which topic
# Each entry is a sub-map which can contain two entries:
#     type => [string]
#     topic => string
#
# Topic is the topic to which the record with the given value type
# should be sent to, e.g. for a deployment record below we would
# send the record to "zeebe-deployment" topic.
#
# Type is a list of accepted record types, allowing you to filter
# if you want nothing ([]), commands (["command"]), events (["events"]),
# or rejections (["rejection"]), or a combination of the three, e.g.
# ["command", "event"].
[exporters.args.records]
# If a record value type is omitted in your configuration file,
# it will fall back to whatever is configured in the defaults
defaults = { type = [ "event" ], topic = "zeebe" }
# For records with a value of type DEPLOYMENT
deployment = { topic = "zeebe-deployment" }
# For records with a value of type INCIDENT
incident = { topic = "zeebe-incident" }
# For records with a value of type JOB_BATCH
jobBatch = { topic = "zeebe-job-batch" }
# For records with a value of type JOB
job = { topic = "zeebe-job" }
# For records with a value of type MESSAGE
message = { topic = "zeebe-message" }
# For records with a value of type MESSAGE_SUBSCRIPTION
messageSubscription = { topic = "zeebe-message-subscription" }
# For records with a value of type MESSAGE_START_EVENT_SUBSCRIPTION
messageStartEventSubscription = { topic = "zeebe-message-subscription-start-event" }
# For records with a value of type RAFT
raft = { topic = "zeebe-raft" }
# For records with a value of type TIMER
timer = { topic = "zeebe-timer" }
# For records with a value of type VARIABLE
variable = { topic = "zeebe-variable" }
# For records with a value of type WORKFLOW_INSTANCE
workflowInstance = { topic = "zeebe-workflow" }
# For records with a value of type WORKFLOW_INSTANCE_SUBSCRIPTION
workflowInstanceSubscription = { topic = "zeebe-workflow-subscription" }


目前,这是我将其转换为YAML的方式:

- id: kafka
  className: io.zeebe.exporters.kafka.KafkaExporter
  args:
    maxInFlightRecords: 1000
    inFlightRecordCheckIntervalMs: 1000
    producer:
      servers:
        - 'localhost:9092'
      requestTimeoutMs: 5000
      closeTimeoutMs: 5000
      clientId: zeebe
      maxConcurrentRequests: 3
      config: {}
    records:
      defaults:
        type:
          - event
        topic: zeebe
      deployment:
        topic: zeebe-deployment
      incident:
        topic: zeebe-incident
      jobBatch:
        topic: zeebe-job-batch
      job:
        topic: zeebe-job
      message:
        topic: zeebe-message
      messageSubscription:
        topic: zeebe-message-subscription
      messageStartEventSubscription:
        topic: zeebe-message-subscription-start-event
      raft:
        topic: zeebe-raft
      timer:
        topic: zeebe-timer
      variable:
        topic: zeebe-variable
      workflowInstance:
        type:
          - event
        topic: zeebe-workflow
      workflowInstanceSubscription:
        topic: zeebe-workflow-subscription


在这种方法下,我目前与出口商有关:

@Override
public void configure(Context context) {
  this.logger = context.getLogger();
  this.id = context.getConfiguration().getId();

  final TomlConfig tomlConfig = context.getConfiguration().instantiate(TomlConfig.class);
  this.config = this.configParser.parse(tomlConfig);
  this.recordHandler = new RecordHandler(this.config.getRecords());

  context.setFilter(new KafkaRecordFilter(this.config.getRecords())); // <-- This line specifically
  this.logger.debug("Configured exporter {}", this.id);
}


当我查看代理如何解析YAML时,它显示此JSON对象:

"1" : {
  "jarPath" : null,
  "className" : "io.zeebe.exporters.kafka.KafkaExporter",
  "args" : {
    "maxInFlightRecords" : 1000,
    "inFlightRecordCheckIntervalMs" : 1000,
    "producer" : {
      "servers" : {
        "0" : "localhost:9092"
      },
      "requestTimeoutMs" : 5000,
      "closeTimeoutMs" : 5000,
      "clientId" : "zeebe",
      "maxConcurrentRequests" : 3
    },
    "records" : {
      "defaults" : {
        "type" : {
          "0" : "event"
        },
        "topic" : "zeebe"
      },
      "deployment" : {
        "topic" : "zeebe-deployment"
      },
      "incident" : {
        "topic" : "zeebe-incident"
      },
      "jobBatch" : {
        "topic" : "zeebe-job-batch"
      },
      "job" : {
        "topic" : "zeebe-job"
      },
      "message" : {
        "topic" : "zeebe-message"
      },
      "messageSubscription" : {
        "topic" : "zeebe-message-subscription"
      },
      "messageStartEventSubscription" : {
        "topic" : "zeebe-message-subscription-start-event"
      },
      "raft" : {
        "topic" : "zeebe-raft"
      },
      "timer" : {
        "topic" : "zeebe-timer"
      },
      "variable" : {
        "topic" : "zeebe-variable"
      },
      "workflowInstance" : {
        "type" : {
          "0" : "event"
        },
        "topic" : "zeebe-workflow"
      },
      "workflowInstanceSubscription" : {
        "topic" : "zeebe-workflow-subscription"
      }
    }
  },


特别是这部分:

"producer" : {
  "servers" : {
    "0" : "localhost:9092"
  },


应该:

"producer" : {
      "servers" : [ "localhost:32801" ]
    },


我也从调用此导出程序的主代理那里收到此错误:

Caused by: java.lang.IllegalStateException: Expected BEGIN_ARRAY but was BEGIN_OBJECT at path $.producer.servers
at com.google.gson.internal.bind.JsonTreeReader.expect(JsonTreeReader.java:163) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.JsonTreeReader.beginArray(JsonTreeReader.java:72) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:80) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:61) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:131) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:222) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:131) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:222) ~[gson-2.8.6.jar:?]
at com.google.gson.Gson.fromJson(Gson.java:932) ~[gson-2.8.6.jar:?]
at com.google.gson.Gson.fromJson(Gson.java:1003) ~[gson-2.8.6.jar:?]
at com.google.gson.Gson.fromJson(Gson.java:975) ~[gson-2.8.6.jar:?]
at io.zeebe.broker.exporter.context.ExporterConfiguration.instantiate(ExporterConfiguration.java:43) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.exporters.kafka.KafkaExporter.configure(KafkaExporter.java:81) ~[zeebe-kafka-exporter-1.2.0-SNAPSHOT-uber.jar:1.2.0-SNAPSHOT]
at io.zeebe.broker.exporter.repo.ExporterRepository.validate(ExporterRepository.java:91) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.exporter.repo.ExporterRepository.load(ExporterRepository.java:54) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.exporter.repo.ExporterRepository.load(ExporterRepository.java:84) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.system.partitions.ZeebePartition.<init>(ZeebePartition.java:138) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.lambda$partitionsStep$17(Broker.java:339) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.lambda$startStepByStep$2(StartProcess.java:60) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.startStepByStep(StartProcess.java:58) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.start(StartProcess.java:43) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.partitionsStep(Broker.java:346) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.lambda$initStart$9(Broker.java:183) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.lambda$startStepByStep$2(StartProcess.java:60) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.startStepByStep(StartProcess.java:58) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.start(StartProcess.java:43) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.internalStart(Broker.java:135) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.util.LogUtil.doWithMDC(LogUtil.java:21) ~[zeebe-util-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.start(Broker.java:115) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.StandaloneBroker.run(StandaloneBroker.java:59) ~[zeebe-distribution-0.23.1.jar:0.23.1]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
... 5 more


因此,基本上,我目前想知道的是,这是YAML格式还是代码本身存在问题?我对导出器进行了单元测试,甚至将依赖项升级到当前版本(这引起了重大更改),但是测试都通过了。出口商的代码库在这里:https://github.com/zeebe-io/zeebe-kafka-exporter
经纪人在这里:https://github.com/zeebe-io/zeebe。任何帮助将不胜感激。

最佳答案

这是转换配置时的错误。我为此https://github.com/zeebe-io/zeebe/issues/4552创建了一个错误问题

请注意,导出器现在是地图而不是列表,这意味着您应按以下方式进行配置:

exporters:
  kafka:
    className: io.zeebe.exporters.kafka.KafkaExporter
    args:


代替:

exporters:
  - id: kafka
    className: io.zeebe.exporters.kafka.KafkaExporter
    args:

关于java - 将YAML转换为Java对象,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/61826566/

10-08 23:27