本文介绍了无法通过 KafkaIO 在梁中从 kafka 读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Apchea Beam 中编写了一个非常简单的管道,如下所示,从 Confluent Cloud 上的 kafka 集群中读取数据,如下所示:

I have written a very simpel pipeline in Apchea Beam as follow to read data from my kafka cluster on Confluent Cloud as follow:

        Pipeline pipeline = Pipeline.create(options);

        Map<String, Object> propertyBuilder = new HashMap();
        propertyBuilder.put("ssl.endpoint.identification.algorithm", "https");
        propertyBuilder.put("sasl.mechanism","PLAIN");
        propertyBuilder.put("request.timeout.ms","20000");
        propertyBuilder.put("retry.backoff.ms","500");

        pipeline
            .apply(KafkaIO.<byte[], byte[]>readBytes()
               .withBootstrapServers("pkc-epgnk.us-central1.gcp.confluent.cloud:9092")
               .withTopic("gcp-ingestion-1")
               .withKeyDeserializer(ByteArrayDeserializer.class)
               .withValueDeserializer(ByteArrayDeserializer.class)
               .updateConsumerProperties(propertyBuilder)
               .withoutMetadata() // PCollection<KV<Long, String>>
            ) .apply(Values.<byte[]>create());

但是,当运行上面的代码从我的 kafka 集群中读取数据时,我得到了低于 excpetion

However, I get below excpetion when running above codes to read data from my kafka cluster

我在上面直接运行 java runner,我使用的是 Beam 2.8,

I run above on direct java runner, I am using beam 2.8,

我可以读取并向我的 kafka confluent 集群生成消息,但不能通过上述代码.

I can read and produce messages to my kafka confluent cluster but not by above codes.

推荐答案

如果您遵循堆栈跟踪,似乎代码试图将超时配置属性强制转换为 Integer:https://github.com/apache/beam/blob/2e759fecf63d62d110f29265f9438128e3bdc8ab/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka.java#L112

If you follow the stack trace it appears that the code tries to cast the timeout configuration property to Integer: https://github.com/apache/beam/blob/2e759fecf63d62d110f29265f9438128e3bdc8ab/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L112

但它得到的是一个字符串.我的猜测是,这是因为您在此处将其设置为字符串:propertyBuilder.put("request.timeout.ms","20000").我认为正确的做法是将其设置为 Integer,例如像 propertyBuilder.put("request.timeout.ms", 20000)(超时值周围没有引号).

But instead it gets a string. My guess is that this is because you set it as string here: propertyBuilder.put("request.timeout.ms","20000"). I assume the correct thing would be to set it as Integer, e.g. like propertyBuilder.put("request.timeout.ms", 20000) (no quotes around the timeout value).

您可能也有其他配置属性的类似问题(例如重试退避),您需要仔细检查属性类型.

You also may have similar issues with other configuration properties (e.g. retry backoff), you need to double check the property types.

这篇关于无法通过 KafkaIO 在梁中从 kafka 读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 17:14