本文介绍了Strimzi - 连接外部客户端的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据此处的讨论,我使用以下步骤启用外部客户端(基于 kafkajs) 在 OpenShift 上连接到 Strimzi.这些步骤来自这里.

Following on discussion here, I used the following steps to enable an external client (based on kafkajs) connect to Strimzi on OpenShift. These steps are from here.

kafka-persistent-single.yaml 修改为如下所示.

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.3.0
    replicas: 1
    listeners:
      plain: {}
      tls: {}
      external:
          type: route
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.3"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: false
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 5Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

提取证书,

为了提取证书并在客户端使用它,我运行了以下命令:

Extract certificate,

To extract certificate and use it in client, I ran the following command:

kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -D > ca.crt

请注意,我必须在我的 macOS 上使用 base64 -D 而不是 base64 -d,如文档中所示.

Note that, I had to use base64 -D on my macOS and not base64 -d as shown in documentation.

这是从他们的 npm 页面和他们的 文档改编而来的客户端.

This is the client adapted from their npm page and their documentation.

const fs = require('fs')
const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io'],
  ssl : { rejectUnauthorized: false,
    ca : [fs.readFileSync('ca.crt', 'utf-8')]
  }
})

const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
  // Producing
  await producer.connect()
  await producer.send({
    topic: 'test-topic',
    messages: [
      { value: 'Hello KafkaJS user!' },
    ],
  })

  // Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

问题

当我从具有 ca.crt 的文件夹中运行 node sample.js 时,我收到一条连接被拒绝的消息.

Question

When I run node sample.js from the folder having ca.crt, I get a connection refused message.

{"level":"ERROR","timestamp":"2019-10-05T03:22:40.491Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.99.100:9094","broker":"my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io:9094","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.99.100:9094\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1113:14)"}

我错过了什么?

推荐答案

经过与@ppatierno 的深入讨论,我觉得Strimzi 集群与Kafka 控制台客户端配合得很好.另一方面,kafkajs 包一直以 NOT_LEADER_FOR_PARTITION 失败.

After an extended discussion with @ppatierno, I feel that, the Strimzi cluster works well with the Kafka console clients. The kafkajs package, on the other hand, keeps failing with NOT_LEADER_FOR_PARTITION.

更新Python 客户端似乎工作没有大惊小怪;所以,我放弃了 kafkajs.

UPDATE The Python client seem to be working without a fuss; so, I am abandoning kafkajs.

这篇关于Strimzi - 连接外部客户端的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-03 19:02