Connect无法通过SSL读取Kafka主题

Connect无法通过SSL读取Kafka主题

本文介绍了Kafka Connect无法通过SSL读取Kafka主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

正在运行的kafka在我们的docker-swarm中连接,并带有以下撰写文件:

Running kafka connect in our docker-swarm, with the following compose file:

cp-kafka-connect-node:
    image: confluentinc/cp-kafka-connect:5.1.0
    ports:
      - 28085:28085
    secrets:
      - kafka.truststore.jks
      - source: kafka-connect-aws-credentials
        target: /root/.aws/credentials
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka01:9093,kafka02:9093,kafka03:9093
      CONNECT_LOG4J_ROOT_LEVEL: TRACE
      CONNECT_REST_PORT: 28085
      CONNECT_GROUP_ID: cp-kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: dev_cp-kafka-connect-config
      CONNECT_OFFSET_STORAGE_TOPIC: dev_cp-kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: dev_cp-kafka-connect-status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'false'
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_REST_ADVERTISED_HOST_NAME: localhost
      CONNECT_PLUGIN_PATH: /usr/share/java/
      CONNECT_SECURITY_PROTOCOL: SSL
      CONNECT_SSL_TRUSTSTORE_LOCATION: /run/secrets/kafka.truststore.jks
      CONNECT_SSL_TRUSTSTORE_PASSWORD: ********
      KAFKA_HEAP_OPTS: '-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2'
    deploy:
      replicas: 1
      resources:
        limits:
          cpus: '0.50'
          memory: 4gb
      restart_policy:
        condition: on-failure
        delay: 10s
        max_attempts: 3
        window: 2000s

secrets:
  kafka.truststore.jks:
    external: true
  kafka-connect-aws-credentials:
    external: true

kafka connect节点成功启动,我能够设置任务并查看这些任务的状态...

The kafka connect node starts up successfully, and I am able to set up tasks and view the status of those tasks...

我设置的连接器我称为kafka-sink,它是通过以下配置创建的:

The connector I setup I called kafka-sink, I created it with the following config:

"config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "eu-central-1",
    "flush.size": "1",
    "schema.compatibility": "NONE",
    "tasks.max": "1",
    "topics": "input-topic-name",
    "s3.part.size": "5242880",
    "timezone": "UTC",
    "directory.delim": "/",
    "locale": "UK",
    "s3.compression.type": "gzip",
    "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "name": "kafka-sink",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "my-s3-bucket",
    "rotate.schedule.interval.ms": "60000"
  }

此任务现在表明它正在运行.

This task now says that it is running.

当我不包括SSL配置时,具体来说:

When I did not include the SSL config, specifically:

  CONNECT_BOOTSTRAP_SERVERS: kafka01:9093,kafka02:9093,kafka03:9093
  CONNECT_SECURITY_PROTOCOL: SSL
  CONNECT_SSL_TRUSTSTORE_LOCATION: /run/secrets/kafka.truststore.jks
  CONNECT_SSL_TRUSTSTORE_PASSWORD: ********

而是指向没有安全性暴露的引导服务器:

and instead pointed to a bootstrap server that was exposed with no security:

  CONNECT_BOOTSTRAP_SERVERS: insecurekafka:9092

它工作正常,并从适当的输入主题中读取,并通过默认分区输出到S3存储桶...

It worked fine, and read from the appropriate input topic, and output to the S3 bucket with default partitioning...

但是,当我针对安全的kafka主题使用SSL配置运行它时,它不会记录任何错误,不会引发任何异常,但是尽管数据不断被推送到输入主题中,但是它什么也没做...

However, when I run it using the SSL config against my secure kafka topic, it logs no errors, throws no exceptions, but does nothing at all despite data continuously being pushed to the input topic...

我做错什么了吗?

这是我第一次使用Kafka Connect,通常情况下,我是使用Spring Boot应用程序连接到kafka的,您只需在配置中指定信任库的位置和密码即可.

This is my first time using Kafka Connect, normally, I connect to kafka using Spring Boot apps where you just have to specify the truststore location and password in the config.

我在撰写文件或任务配置中缺少某些配置吗?

Am I missing some configuration in either my compose file or my task config?

推荐答案

我认为您需要同时为消费者和生产者添加SSL配置.在此处检查使用SSL进行Kafka Connect加密像这样

I think you need to add SSL config for both consumer and producer. Check here Kafka Connect Encrypt with SSLSomething like this

security.protocol=SSL
ssl.truststore.location=~/kafka.truststore.jks
ssl.truststore.password=<password>
ssl.keystore.location=~/kafka.client.keystore.jks
ssl.keystore.password=<password>
ssl.key.password=<password>

producer.security.protocol=SSL
producer.ssl.truststore.location=~/kafka.truststore.jks
producer.ssl.truststore.password=<password>
producer.ssl.keystore.location=~/kafka.client.keystore.jks
producer.ssl.keystore.password=<password>
producer.ssl.key.password=<password>

这篇关于Kafka Connect无法通过SSL读取Kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 00:52