问题描述
我在aws上运行MSK,并且能够从MSK发送记录,也可以从MSK发送记录.我只是想使用Kafka connect,这样进入MSK的记录将转到Elastic Search.我已完成以下操作,但不确定连接器是否正常工作,因为我看不到弹性搜索中的记录这是我发送的记录
I have MSK running on aws and i am able to send records in and out from MSK .I just wanted to use Kafka connect so that records coming into MSK will go to Elastic Search .I have done below things but i am not sure if my connector is working properly or not ,Because i can not see records into Elastic Search This is the records that i am sending
{
"data": {
"RequestID": 517082653,
"ContentTypeID": 9,
"OrgID": 16145,
"UserID": 4,
"PromotionStartDateTime": "2019-12-14T16:06:21Z",
"PromotionEndDateTime": "2019-12-14T16:16:04Z",
"SystemStartDatetime": "2019-12-14T16:17:45.507000000Z"
},
"metadata": {
"timestamp": "2019-12-29T10:37:31.502042Z",
"record-type": "data",
"operation": "insert",
"partition-key-type": "schema-table",
"schema-name": "dbo",
"table-name": "TRFSDIQueue"
}
}
我已经这样安装Kafka connect
I have installed Kafka connect like this
wget /usr/local http://packages.confluent.io/archive/5.2/confluent-5.2.0-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-5.2.0-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-5.2.0 /usr/local/confluent
vim /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
并更改网址和主题名称
之后,我就开始了如下所示的kafka连接
After that i have just started kafka connect like below
/usr/local/confluent/bin/connect-standalone /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
这给了我输出INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:62)
因此,在没有提到我的经纪人详细信息和动物园管理员详细信息的地方,那么我将如何连接到MSK.
So no where i have mentioned my Broker details ad zookeper details then i how will it connect to MSK .
请帮助我理解这一点.我想念的是什么.我不进行模式转换,所以我没有修改schema-registry.properties
.
Please help me understand this .What am i missing .I do not schema transformation so i am not modifying schema-registry.properties
.
当我尝试使用以下命令时,出现以下错误
When i tried using below command i got below error
/usr/local/confluent/bin/connect-standalone /home/ec2-user/kafka_2.12-2.2.1/config/connect-standalone.properties /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
[2019-12-30 03:19:30,149] ERROR Failed to create job for /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties (org.apache.kafka.connect.cli.ConnectStandalone:108)
[2019-12-30 03:19:30,149] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:119)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:116)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.2.0-cp1', encodedVersion=2.2.0-cp1, type=source, typeName='source', location='classpath'}
at org.apache.kafka.connect.runtime.isolation.Plugins.newConnector(Plugins.java:175)
at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:382)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:261)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)
推荐答案
您将引导服务器和Connect相关属性放在etc/kafka/connect-standalone.properties文件中
You put your bootstrap servers and Connect related properties in the etc/kafka/connect-standalone.properties file
要加载连接器,您必须使用此处讨论的plugin.path
To load the Connector, you must use plugin.path
as discussed here
https://docs.confluent.io/current/connect/managing/community.html
您也可以下载Confluent Hub CLI来设置那里可用的任何Apache Kafka Connector(您不需要Confluent Platform,因此可以忽略Schema Registry)
You can also instead download Confluent Hub CLI to setup any Apache Kafka Connector available there (you don't require Confluent Platform, thus you can ignore Schema Registry)
https://docs.confluent.io/current/connect/managing/confluent-hub/client.html
或者按照之前的回答,使用预先加载了Elasticsearch连接器的Confluent Kafka Connect docker镜像
Or as answered before, use the Confluent Kafka Connect docker images, which have Elasticsearch connector pre-loaded
另外,我建议不要使用tarball,而应使用APT/YUM安装Confluent软件包
Plus, I would suggest not using tarballs and instead use APT/YUM to get Confluent packages installed
https://docs.confluent.io/current/installation/installing_cp/index.html
这篇关于找不到任何实现Connector且名称与io.confluent.connect.elasticsearch.ElasticsearchSinkConnector匹配的类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!