我正在尝试使用 Kafka Connect Elasticsearch 连接器,但没有成功。它因以下错误而崩溃:

[2018-11-21 14:48:29,096] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:108)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector , available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='1.0.1', encodedVersion=1.0.1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}

我在 kafka 子文件夹中解压缩了插件的构建,并且在 connect-standalone.properties 中有以下行:
plugin.path=/opt/kafka/plugins/kafka-connect-elasticsearch-5.0.1/src/main/java/io/confluent/connect/elasticsearch

我可以看到该文件夹​​中的各种连接器,但 Kafka Connect 没有加载它们;但它确实加载了标准连接器,如下所示:
[2018-11-21 14:56:28,258] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:136)
[2018-11-21 14:56:28,259] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-11-21 14:56:28,260] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)

如何正确注册连接器?

最佳答案

我昨天在 docker 中的 kafka 上手动运行了 jdbc 连接器,没有融合平台等,只是为了了解这些东西在下面是如何工作的。我不必在我身边 build jar 或任何类似的东西。希望它与您相关 - 我所做的是(我将跳过 docker 部件如何使用连接器等安装目录):

  • https://www.confluent.io/connector/kafka-connect-jdbc/ 下载连接器,解压 zip
  • 将 zip 的内容放在属性文件中配置的路径中的目录中(如下图第三点所示) -
    plugin.path=/plugins
    

    所以树看起来像这样:
    /plugins/
    └── jdbcconnector
        └──assets
        └──doc
        └──etc
        └──lib
    

    请注意依赖项所在的 lib 目录,其中之一是 kafka-connect-jdbc-5.0.0.jar
  • 现在您可以尝试运行连接器
    ./connect-standalone.sh connect-standalone.properties jdbc-connector-config.properties
    

    connect-standalone.properties 是 kafka-connect 所需的常见属性,就我而言:
    bootstrap.servers=localhost:9092
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000
    plugin.path=/plugins
    rest.port=8086
    rest.host.name=127.0.0.1
    

    jdbc-connector-config.properties涉及更多,因为它只是为此特定连接器的配置,您需要深入研究连接器文档-对于jdbc源,它是https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html
  • 关于apache-kafka - Kafka Connect 找不到连接器,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53412622/

    10-16 00:35