问题描述
我在 aws 上运行 MSK,并且我能够从 MSK 传入和传出记录.我只是想使用 Kafka 连接,以便进入 MSK 的记录将转到 Elastic Search .我已完成以下操作,但我不确定我的连接器是否正常工作,因为我无法在 Elastic Search 中看到记录这是我要发送的记录
I have MSK running on aws and i am able to send records in and out from MSK .I just wanted to use Kafka connect so that records coming into MSK will go to Elastic Search .I have done below things but i am not sure if my connector is working properly or not ,Because i can not see records into Elastic Search This is the records that i am sending
{
"data": {
"RequestID": 517082653,
"ContentTypeID": 9,
"OrgID": 16145,
"UserID": 4,
"PromotionStartDateTime": "2019-12-14T16:06:21Z",
"PromotionEndDateTime": "2019-12-14T16:16:04Z",
"SystemStartDatetime": "2019-12-14T16:17:45.507000000Z"
},
"metadata": {
"timestamp": "2019-12-29T10:37:31.502042Z",
"record-type": "data",
"operation": "insert",
"partition-key-type": "schema-table",
"schema-name": "dbo",
"table-name": "TRFSDIQueue"
}
}
我已经像这样安装了Kafka连接
I have installed Kafka connect like this
wget /usr/local http://packages.confluent.io/archive/5.2/confluent-5.2.0-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-5.2.0-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-5.2.0 /usr/local/confluent
vim /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
并更改网址和主题名称
在那之后我刚刚开始像下面这样的kafka连接
After that i have just started kafka connect like below
/usr/local/confluent/bin/connect-standalone /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
这给了我输出 INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:62)
所以我没有提到我的经纪人详细信息和动物园管理员的详细信息,那么我将如何连接到 MSK.
So no where i have mentioned my Broker details ad zookeper details then i how will it connect to MSK .
请帮助我理解这一点.我错过了什么.我不进行模式转换,所以我不会修改 schema-registry.properties
.
Please help me understand this .What am i missing .I do not schema transformation so i am not modifying schema-registry.properties
.
当我尝试使用以下命令时,出现以下错误
When i tried using below command i got below error
/usr/local/confluent/bin/connect-standalone /home/ec2-user/kafka_2.12-2.2.1/config/connect-standalone.properties /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
[2019-12-30 03:19:30,149] ERROR Failed to create job for /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties (org.apache.kafka.connect.cli.ConnectStandalone:108)
[2019-12-30 03:19:30,149] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:119)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:116)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}
at org.apache.kafka.connect.runtime.isolation.Plugins.newConnector(Plugins.java:175)
at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:382)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:261)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)
推荐答案
您将引导服务器和 Connect 相关属性放在 etc/kafka/connect-standalone.properties 文件中
You put your bootstrap servers and Connect related properties in the etc/kafka/connect-standalone.properties file
要加载连接器,您必须使用此处讨论的 plugin.path
To load the Connector, you must use plugin.path
as discussed here
https://docs.confluent.io/current/connect/managing/community.html
你也可以下载 Confluent Hub CLI 来设置任何可用的 Apache Kafka 连接器(你不需要 Confluent Platform,因此你可以忽略 Schema Registry)
You can also instead download Confluent Hub CLI to setup any Apache Kafka Connector available there (you don't require Confluent Platform, thus you can ignore Schema Registry)
https://docs.confluent.io/current/connect/managing/confluent-hub/client.html
或者如之前回答的那样,使用 Confluent Kafka Connect docker 镜像,其中预加载了 Elasticsearch 连接器
Or as answered before, use the Confluent Kafka Connect docker images, which have Elasticsearch connector pre-loaded
另外,我建议不要使用压缩包,而是使用 APT/YUM 来安装 Confluent 包
Plus, I would suggest not using tarballs and instead use APT/YUM to get Confluent packages installed
https://docs.confluent.io/current/installation/installing_cp/index.html
这篇关于找不到任何实现连接器且名称与 io.confluent.connect.elasticsearch.ElasticsearchSinkConnector 匹配的类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!