问题描述
我一直在尝试使用 掌舵图.所以我为 Kafka pod 定义了 NodePort 服务.我使用相同的主机和端口检查了控制台 Kafka 生产者和消费者 - 它们工作正常.但是,当我创建 Spark 应用程序作为数据消费者和 Kafka 作为生产者时,他们无法连接到 Kafka service0.我使用 minikube ip(而不是节点 ip)作为主机和服务 NodePort 端口.虽然,在 Spark 日志中,我看到 NodePort 服务解析端点,代理被发现为 Pod 寻址和端口:
I have been trying to deploy Kafka using Helm charts. So I defined NodePort service for Kafka pods. I checked console Kafka producer and consumer with the same hosts and ports - they work properly. However, when I create Spark application as data consumer and Kafka as producer they are not able to connect to the Kafka service0. I used minikube ip (instead of node ip) for the host and service NodePort port.Although, in Spark logs, I saw that NodePort service resolves endpoints and brokers are discovered as pods addressed and ports:
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Discovered group coordinator 172.17.0.20:9092 (id: 2147483645 rack: null)
INFO ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Revoking previously assigned partitions []
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] (Re-)joining group
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2147483645 (/172.17.0.20:9092) could not be established. Broker may not be available.
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Group coordinator 172.17.0.20:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2 (/172.17.0.20:9092) could not be established. Broker may not be available.
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 0 (/172.17.0.12:9092) could not be established. Broker may not be available.
如何改变这种行为?
NodePort 服务定义如下所示:
NodePort service definition looks like this:
kind: Service
apiVersion: v1
metadata:
name: kafka-service
spec:
selector:
app: cp-kafka
release: my-confluent-oss
ports:
- protocol: TCP
targetPort: 9092
port: 32400
nodePort: 32400
type: NodePort
Spark 消费者配置:
Spark consumer configuration:
def kafkaParams() = Map[String, Object](
"bootstrap.servers" -> "192.168.99.100:32400",
"schema.registry.url" -> "http://192.168.99.100:8081",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"group.id" -> "avro_data",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
Kafka生产者配置:
Kafka producer configuration:
props.put("bootstrap.servers", "192.168.99.100:32400")
props.put("client.id", "avro_data")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("schema.registry.url", "http://192.168.99.100:32500")
Kafka 的所有 K8s 服务:
All the K8s services for Kafka:
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-service NodePort 10.99.113.234 <none> 32400:32400/TCP 6m34s
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 27d
my-confluent-oss-cp-kafka ClusterIP 10.100.156.108 <none> 9092/TCP 102m
my-confluent-oss-cp-kafka-connect ClusterIP 10.99.78.89 <none> 8083/TCP 102m
my-confluent-oss-cp-kafka-headless ClusterIP None <none> 9092/TCP 102m
my-confluent-oss-cp-kafka-rest ClusterIP 10.100.152.109 <none> 8082/TCP 102m
my-confluent-oss-cp-ksql-server ClusterIP 10.96.249.202 <none> 8088/TCP 102m
my-confluent-oss-cp-schema-registry ClusterIP 10.109.27.45 <none> 8081/TCP 102m
my-confluent-oss-cp-zookeeper ClusterIP 10.102.182.90 <none> 2181/TCP 102m
my-confluent-oss-cp-zookeeper-headless ClusterIP None <none> 2888/TCP,3888/TCP 102m
schema-registry-service NodePort 10.103.100.64 <none> 32500:32500/TCP 33m
zookeeper-np NodePort 10.98.180.130 <none> 32181:32181/TCP 53m
推荐答案
当我尝试访问 kafka broker (cp-helm-chart) 从外部运行在 minikube 上.
I had a similar issue when I was trying to access kafka broker (cp-helm-chart) running on minikube from outside.
这是我如何解决它的.在使用 helm install 从本地存储库进行安装之前.
Here how I resolved it. Before you install using helm install from local repository.
- 在此文件中编辑 https://github.com/confluentinc/cp-helm-charts/blob/master/charts/cp-kafka/values.yaml
- 搜索 nodeport: 并将其启用字段更改为 true.
节点端口:
启用:真 - 通过删除 #:
取消对这两行的注释"advertised.listeners": |-
EXTERNAL://${HOST_IP}:$((31090 + ${KAFKA_BROKER_ID})) - 将 ${HOST_IP} 替换为您的 minikube ip(在 cmd 中输入 minikube ip 以检索您的 k8s 主机 ip,例如:196.169.99.100)
- 将 ${KAFKA_BROKER_ID} 替换为代理 ID(如果只有一个代理正在运行,则默认情况下仅为 0)
- 最终看起来像这样:
"advertised.listeners": |-
外部://196.169.99.100:31090
现在您可以通过将 bootstrap.servers 指向 196.169.99.100:31090 从外部访问在 k8s 集群中运行的 kafka 代理
Now you can access the kafka broker running within k8s cluster from outside by pointing the bootstrap.servers to 196.169.99.100:31090
这篇关于Kafka NodePort 服务在集群外无法访问的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!