问题描述
我正在将融合的hdfs接收器连接器5.0.0与kafka 2.0.0结合使用,我需要使用ExtractTopic转换( https://docs.confluent.io/current/connect/transforms/extracttopic.html ).我的连接器工作正常,但是当我添加此转换时,即使在只有2个属性的简单数据样本上,也会出现NullPointerException.
I am using confluent hdfs sink connector 5.0.0 with kafka 2.0.0 and I need to use ExtractTopic transformation (https://docs.confluent.io/current/connect/transforms/extracttopic.html). My connector works fine but when I add this transformation I get NullPointerException, even on simple data sample with only 2 attributes.
ERROR Task hive-table-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.NullPointerException
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:352)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这是连接器的配置:
name=hive-table-test
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=hive_table_test
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
value.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
schema.compatibility=BACKWARD
# HDFS configuration
# Use store.url instead of hdfs.url (deprecated) in later versions. Property store.url does not work, yet
hdfs.url=${env.HDFS_URL}
hadoop.conf.dir=/etc/hadoop/conf
hadoop.home=/opt/cloudera/parcels/CDH/lib/hadoop
topics.dir=${env.HDFS_TOPICS_DIR}
# Connector configuration
format.class=io.confluent.connect.hdfs.avro.AvroFormat
flush.size=100
rotate.interval.ms=60000
# Hive integration
hive.integration=true
hive.metastore.uris=${env.HIVE_METASTORE_URIS}
hive.conf.dir=/etc/hive/conf
hive.home=/opt/cloudera/parcels/CDH/lib/hive
hive.database=kafka_connect
# Transformations
transforms=InsertMetadata, ExtractTopic
transforms.InsertMetadata.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertMetadata.partition.field=partition
transforms.InsertMetadata.offset.field=offset
transforms.ExtractTopic.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ExtractTopic.field=name
transforms.ExtractTopic.skip.missing.or.null=true
我正在使用架构注册表,数据为avro格式,并且我确定给定的属性name
不为null.有什么建议?我基本上需要提取给定字段的内容并将其用作主题名称.
I am using schema registry, data is in avro format and I am sure the given attribute name
is not null. Any suggestions? What I need is basically to extract content of given field and use it as a topic name.
即使在像avro格式这样的简单json上,它也会发生:
It happens even on simple json like this in avro format:
{
"attr": "tmp",
"name": "topic1"
}
推荐答案
简短的回答是因为,您在转换"中更改了主题的名称.
Short answer is because, you change the name of the topic in your Transformation.
Hdfs连接器具有单独的TopicPartitionWriter
.当创建每个分区TopicPartitionWriter
的open(...)
方法中创建负责处理消息的SinkTask时.
Hdfs Connector for each topic partition has separate TopicPartitionWriter
. When SinkTask, that is responsible for processing messages is created in open(...)
method for each partition TopicPartitionWriter
is created.
当处理SinkRecords时,它会根据 topic 名称和 partition 编号查找TopicPartitionWriter
,并尝试将记录追加到其缓冲区中.在您的情况下,找不到任何写消息.主题名称已通过Transformation更改,对于该对(主题,分区),未创建任何TopicPartitionWriter
.
When it processed SinkRecords, based on topic name and partition number it looks up for TopicPartitionWriter
and try to append record to its buffer. In your case it couldn't find any write for message. The topic name was changed by Transformation and for that pair (topic, partition) any TopicPartitionWriter
was not created.
SinkRecords已经设置了分区和主题,因此您不必应用任何转换.
SinkRecords, that are passed to HdfsSinkTask::put(Collection<SinkRecord> records)
, have partitions and topic already set, so you don't have to apply any Transformations.
我认为io.confluent.connect.transforms.ExtractTopic
应该用于SourceConnector
.
这篇关于kafka connect-具有hdfs接收器连接器的ExtractTopic转换抛出NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!