本文介绍了Kafka JDBC Sink Connector,批量插入值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我每秒收到很多消息(通过 http 协议)(50000 - 100000)并想将它们保存到 PostgreSql.为此,我决定使用 Kafka JDBC Sink.

I receive a lot of the messages (by http-protocol) per second (50000 - 100000) and want to save them to PostgreSql. I decided to use Kafka JDBC Sink for this purpose.

消息按一条记录保存到数据库中,而不是批量保存.我想在 PostgreSQL 中批量插入记录,记录大小为 500-1000.

The messages are saved to database by one record, not in batches. I want to insert records in PostgreSQL in batches with size 500-1000 records.

我在这个问题上找到了一些答案:如何使用批处理.尺寸?

I found some answers on this problem in issue: How to use batch.size?

我尝试在配置中使用相关选项,但似乎没有任何效果.

I tried to use related options in configuration, but it seems that they no have any effect.

我的 Kafka JDBC Sink PostgreSql 配置(etc/kafka-connect-jdbc/postgres.properties):

My Kafka JDBC Sink PostgreSql configuration (etc/kafka-connect-jdbc/postgres.properties):

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=3

# The topics to consume from - required for sink connectors like this one
topics=jsonb_pkgs

connection.url=jdbc:postgresql://localhost:5432/test?currentSchema=test
auto.create=false
auto.evolve=false

insert.mode=insert
connection.user=postgres
table.name.format=${topic}

connection.password=pwd

batch.size=500
# based on 500*3000byte message size
fetch.min.bytes=1500000
fetch.wait.max.ms=1500
max.poll.records=4000

我还在connect-distributed.properties中添加了选项:

consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500

虽然每个分区每秒获取超过 1000 条记录,但记录会被 1 条保存到 PostgreSQL.

Although each a partition gets more than 1000 records per second, records are saved to PostgreSQL by one.

消费者选项已添加到具有正确名称的其他文件中

我还在etc/schema-registry/connect-avro-standalone.properties中添加了选项:

# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000

推荐答案

我意识到我误解了文档.记录被一条一条地插入到数据库中.一笔交易中插入的记录数取决于batch.sizeconsumer.max.poll.records.我预计批量插入是以另一种方式实现的.我想有一个选项来插入这样的记录:

I realised that I misunderstood the documentation. The records are inserted in database one by one. The count of the records inserted in one transaction depends on batch.size and consumer.max.poll.records. I expected that the batch insert was implemented the other way. I would like to have an option to insert records like this:

INSERT INTO table1 (First, Last)
VALUES
    ('Fred', 'Smith'),
    ('John', 'Smith'),
    ('Michael', 'Smith'),
    ('Robert', 'Smith');

但这似乎是不可能的.

这篇关于Kafka JDBC Sink Connector,批量插入值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-30 21:31