问题描述
我设置了 aws MSK,我正在尝试将记录从 MSK 接收到弹性搜索.我能够将数据以 json 格式推送到 MSK.我想沉迷于弹性搜索.我能够正确地完成所有设置.这就是我在 EC2 实例上所做的
I have aws MSK set up and i am trying to sink records from MSK to elastic search.I am able to push data into MSK into json format .I want to sink to elastic search .I am able to do all set up correctly .This is what i have done on EC2 instance
wget /usr/local http://packages.confluent.io/archive/3.1/confluent-oss-3.1.2-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-oss-3.1.2-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-3.1.2 /usr/local/confluent
/usr/local/confluent/etc/kafka-connect-elasticsearch
之后我修改了 kafka-connect-elasticsearch 并设置了我的弹性搜索 url
After that i have modified kafka-connect-elasticsearch and set my elastic search url
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=AWSKafkaTutorialTopic
key.ignore=true
connection.url=https://search-abcdefg-risdfgdfgk-es-ex675zav7k6mmmqodfgdxxipg5cfsi.us-east-1.es.amazonaws.com
type.name=kafka-connect
生产者发送如下格式的消息
The producer sends message like below fomrat
{
"data": {
"RequestID": 517082653,
"ContentTypeID": 9,
"OrgID": 16145,
"UserID": 4,
"PromotionStartDateTime": "2019-12-14T16:06:21Z",
"PromotionEndDateTime": "2019-12-14T16:16:04Z",
"SystemStartDatetime": "2019-12-14T16:17:45.507000000Z"
},
"metadata": {
"timestamp": "2019-12-29T10:37:31.502042Z",
"record-type": "data",
"operation": "insert",
"partition-key-type": "schema-table",
"schema-name": "dbo",
"table-name": "TRFSDIQueue"
}
}
我对 kafka 连接将如何从这里开始有点困惑?如果是,我该如何开始?
I am little confused in how will the kafka connect start here ?if yes how can i start that ?
我也启动了如下所示的架构注册表,这给了我错误.
I also have started schema registry like below which gave me error.
/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties
当我这样做时,我得到以下错误
When i do that i get below error
[2019-12-29 13:49:17,861] ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"listener_security_protocol_map":{"CLIENT":"PLAINTEXT","CLIENT_SECURE":"SSL","REPLICATION":"PLAINTEXT","REPLICATION_SECURE":"SSL"},"endpoints":["CLIENT:/
请帮忙.
正如回答中所建议的,我升级了 kafka 连接版本,但随后我开始出现以下错误
As suggested in answer i upgraded the kafka connect version but then i started getting below error
ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:63)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:61)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:72)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:39)
at io.confluent.rest.Application.createServer(Application.java:201)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:41)
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: Timed out trying to create or validate schema topic configuration
at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:168)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:111)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:208)
... 5 more
Caused by: java.util.concurrent.TimeoutException
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:161)
... 7 more
推荐答案
首先,Confluent Platform 3.1.2 已经很老了.建议你获取与Kafka版本一致的版本
First, Confluent Platform 3.1.2 is fairly old. I suggest you get the version that aligns with the Kafka version
您使用位于 bin 和 etc/kafka 文件夹下的适当 connect-*
脚本和属性启动 Kafka Connect
You start Kafka Connect using the appropriate connect-*
scripts and properties located under bin and etc/kafka folders
例如,
/usr/local/confluent/bin/connect-standalone \
/usr/local/confluent/etc/kafka/kafka-connect-standalone.properties \
/usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart.properties
如果可行,您可以继续使用 connect-distributed 命令
If that works, you can move onto using connect-distributed command instead
关于架构注册表,您可以在其 Github 问题上搜索多个试图让 MSK 工作的人,但根本问题与 MSK 未公开 PLAINTEXT 侦听器和架构注册表不支持命名侦听器有关.(这可能自 5.x 版以来发生了变化)
Regarding Schema Registry, you can search its Github issues for multiple people trying to get MSK to work, but the root issue is related to MSK not exposing a PLAINTEXT listener and the Schema Registry not supporting named listeners. (This may have changed since versions 5.x)
您也可以尝试在 ECS/EKS 中使用 Connect 和 Schema Registry 容器,而不是在 EC2 机器中提取
You could also try using Connect and Schema Registry containers in ECS / EKS rather than extracting in an EC2 machine
这篇关于kafka.common.KafkaException:无法解析来自 EC2 的zookeeper 的代理信息到弹性搜索的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!