我目前正在使用Zeebe工作流执行引擎,并且正在尝试正确地将其中一个导出器从使用TOML配置文件升级到YAML文件。这是原始的TOML:
[[exporters]]
id = "kafka"
className = "io.zeebe.exporters.kafka.KafkaExporter"
# Top level exporter arguments
[exporters.args]
# Controls how many records can have been sent to the Kafka broker without
# any acknowledgment Once the limit is reached the exporter will block and
# wait until either one record is acknowledged
maxInFlightRecords = 1000
# How often should the exporter drain the in flight records' queue of completed
# requests and update the broker with the guaranteed latest exported position
inFlightRecordCheckIntervalMs = 1000
# Producer specific configuration
[exporters.args.producer]
# The list of initial Kafka broker contact points. The format should be the same
# one as the ProducerConfig expects, i.e. "host:port"
# Maps to ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
servers = [ "kafka:9092" ]
# Controls how long the producer will wait for a request to be acknowledged by
# the Kafka broker before retrying it
# Maps to ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
requestTimeoutMs = 5000
# Grace period when shutting down the producer in milliseconds
closeTimeoutMs = 5000
# Producer client identifier
clientId = "zeebe"
# Max concurrent requests to the Kafka broker; note that in flight records are batched such that
# in one request you can easily have a thousand records, depending on the producer's batch
# configuration.
maxConcurrentRequests = 3
# Any setting under the following section will be passed verbatim to
# ProducerConfig; you can use this to configure authentication, compression,
# etc. Note that you can overwrite some important settings, so avoid changing
# idempotency, delivery timeout, and retries, unless you know what you're doing
[exporters.args.producer.config]
# Controls which records are pushed to Kafka and to which topic
# Each entry is a sub-map which can contain two entries:
# type => [string]
# topic => string
#
# Topic is the topic to which the record with the given value type
# should be sent to, e.g. for a deployment record below we would
# send the record to "zeebe-deployment" topic.
#
# Type is a list of accepted record types, allowing you to filter
# if you want nothing ([]), commands (["command"]), events (["events"]),
# or rejections (["rejection"]), or a combination of the three, e.g.
# ["command", "event"].
[exporters.args.records]
# If a record value type is omitted in your configuration file,
# it will fall back to whatever is configured in the defaults
defaults = { type = [ "event" ], topic = "zeebe" }
# For records with a value of type DEPLOYMENT
deployment = { topic = "zeebe-deployment" }
# For records with a value of type INCIDENT
incident = { topic = "zeebe-incident" }
# For records with a value of type JOB_BATCH
jobBatch = { topic = "zeebe-job-batch" }
# For records with a value of type JOB
job = { topic = "zeebe-job" }
# For records with a value of type MESSAGE
message = { topic = "zeebe-message" }
# For records with a value of type MESSAGE_SUBSCRIPTION
messageSubscription = { topic = "zeebe-message-subscription" }
# For records with a value of type MESSAGE_START_EVENT_SUBSCRIPTION
messageStartEventSubscription = { topic = "zeebe-message-subscription-start-event" }
# For records with a value of type RAFT
raft = { topic = "zeebe-raft" }
# For records with a value of type TIMER
timer = { topic = "zeebe-timer" }
# For records with a value of type VARIABLE
variable = { topic = "zeebe-variable" }
# For records with a value of type WORKFLOW_INSTANCE
workflowInstance = { topic = "zeebe-workflow" }
# For records with a value of type WORKFLOW_INSTANCE_SUBSCRIPTION
workflowInstanceSubscription = { topic = "zeebe-workflow-subscription" }
目前,这是我将其转换为YAML的方式:
- id: kafka
className: io.zeebe.exporters.kafka.KafkaExporter
args:
maxInFlightRecords: 1000
inFlightRecordCheckIntervalMs: 1000
producer:
servers:
- 'localhost:9092'
requestTimeoutMs: 5000
closeTimeoutMs: 5000
clientId: zeebe
maxConcurrentRequests: 3
config: {}
records:
defaults:
type:
- event
topic: zeebe
deployment:
topic: zeebe-deployment
incident:
topic: zeebe-incident
jobBatch:
topic: zeebe-job-batch
job:
topic: zeebe-job
message:
topic: zeebe-message
messageSubscription:
topic: zeebe-message-subscription
messageStartEventSubscription:
topic: zeebe-message-subscription-start-event
raft:
topic: zeebe-raft
timer:
topic: zeebe-timer
variable:
topic: zeebe-variable
workflowInstance:
type:
- event
topic: zeebe-workflow
workflowInstanceSubscription:
topic: zeebe-workflow-subscription
在这种方法下,我目前与出口商有关:
@Override
public void configure(Context context) {
this.logger = context.getLogger();
this.id = context.getConfiguration().getId();
final TomlConfig tomlConfig = context.getConfiguration().instantiate(TomlConfig.class);
this.config = this.configParser.parse(tomlConfig);
this.recordHandler = new RecordHandler(this.config.getRecords());
context.setFilter(new KafkaRecordFilter(this.config.getRecords())); // <-- This line specifically
this.logger.debug("Configured exporter {}", this.id);
}
当我查看代理如何解析YAML时,它显示此JSON对象:
"1" : {
"jarPath" : null,
"className" : "io.zeebe.exporters.kafka.KafkaExporter",
"args" : {
"maxInFlightRecords" : 1000,
"inFlightRecordCheckIntervalMs" : 1000,
"producer" : {
"servers" : {
"0" : "localhost:9092"
},
"requestTimeoutMs" : 5000,
"closeTimeoutMs" : 5000,
"clientId" : "zeebe",
"maxConcurrentRequests" : 3
},
"records" : {
"defaults" : {
"type" : {
"0" : "event"
},
"topic" : "zeebe"
},
"deployment" : {
"topic" : "zeebe-deployment"
},
"incident" : {
"topic" : "zeebe-incident"
},
"jobBatch" : {
"topic" : "zeebe-job-batch"
},
"job" : {
"topic" : "zeebe-job"
},
"message" : {
"topic" : "zeebe-message"
},
"messageSubscription" : {
"topic" : "zeebe-message-subscription"
},
"messageStartEventSubscription" : {
"topic" : "zeebe-message-subscription-start-event"
},
"raft" : {
"topic" : "zeebe-raft"
},
"timer" : {
"topic" : "zeebe-timer"
},
"variable" : {
"topic" : "zeebe-variable"
},
"workflowInstance" : {
"type" : {
"0" : "event"
},
"topic" : "zeebe-workflow"
},
"workflowInstanceSubscription" : {
"topic" : "zeebe-workflow-subscription"
}
}
},
特别是这部分:
"producer" : {
"servers" : {
"0" : "localhost:9092"
},
应该:
"producer" : {
"servers" : [ "localhost:32801" ]
},
我也从调用此导出程序的主代理那里收到此错误:
Caused by: java.lang.IllegalStateException: Expected BEGIN_ARRAY but was BEGIN_OBJECT at path $.producer.servers
at com.google.gson.internal.bind.JsonTreeReader.expect(JsonTreeReader.java:163) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.JsonTreeReader.beginArray(JsonTreeReader.java:72) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:80) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:61) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:131) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:222) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:131) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:222) ~[gson-2.8.6.jar:?]
at com.google.gson.Gson.fromJson(Gson.java:932) ~[gson-2.8.6.jar:?]
at com.google.gson.Gson.fromJson(Gson.java:1003) ~[gson-2.8.6.jar:?]
at com.google.gson.Gson.fromJson(Gson.java:975) ~[gson-2.8.6.jar:?]
at io.zeebe.broker.exporter.context.ExporterConfiguration.instantiate(ExporterConfiguration.java:43) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.exporters.kafka.KafkaExporter.configure(KafkaExporter.java:81) ~[zeebe-kafka-exporter-1.2.0-SNAPSHOT-uber.jar:1.2.0-SNAPSHOT]
at io.zeebe.broker.exporter.repo.ExporterRepository.validate(ExporterRepository.java:91) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.exporter.repo.ExporterRepository.load(ExporterRepository.java:54) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.exporter.repo.ExporterRepository.load(ExporterRepository.java:84) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.system.partitions.ZeebePartition.<init>(ZeebePartition.java:138) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.lambda$partitionsStep$17(Broker.java:339) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.lambda$startStepByStep$2(StartProcess.java:60) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.startStepByStep(StartProcess.java:58) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.start(StartProcess.java:43) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.partitionsStep(Broker.java:346) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.lambda$initStart$9(Broker.java:183) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.lambda$startStepByStep$2(StartProcess.java:60) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.startStepByStep(StartProcess.java:58) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.start(StartProcess.java:43) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.internalStart(Broker.java:135) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.util.LogUtil.doWithMDC(LogUtil.java:21) ~[zeebe-util-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.start(Broker.java:115) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.StandaloneBroker.run(StandaloneBroker.java:59) ~[zeebe-distribution-0.23.1.jar:0.23.1]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
... 5 more
因此,基本上,我目前想知道的是,这是YAML格式还是代码本身存在问题?我对导出器进行了单元测试,甚至将依赖项升级到当前版本(这引起了重大更改),但是测试都通过了。出口商的代码库在这里:https://github.com/zeebe-io/zeebe-kafka-exporter
经纪人在这里:https://github.com/zeebe-io/zeebe。任何帮助将不胜感激。
最佳答案
这是转换配置时的错误。我为此https://github.com/zeebe-io/zeebe/issues/4552创建了一个错误问题
请注意,导出器现在是地图而不是列表,这意味着您应按以下方式进行配置:
exporters:
kafka:
className: io.zeebe.exporters.kafka.KafkaExporter
args:
代替:
exporters:
- id: kafka
className: io.zeebe.exporters.kafka.KafkaExporter
args:
关于java - 将YAML转换为Java对象,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/61826566/