有没有人遇到Java库(或只是一些代码)来编写Postgres的COPY command使用的binary格式?

它看起来非常简单,但是如果有人已经弄清楚了正确的元组数据格式,那么我也将从这里开始。

实际上,即使仅描述所有数据类型的格式也将有所帮助。

谢谢。

最佳答案

您可以尝试PgBulkInsert,它实现了PostgreSQL的二进制复制协议:

  • https://github.com/bytefish/PgBulkInsert

  • 也可以从Maven中央存储库中获得。

    免责声明:我是项目作者。

    PostgreSQL二进制复制协议

    我不想简单地宣传我的项目,但也要写有关该协议的文章。

    首先,我编写了一个PgBinaryWriter类,该类包装了DataOutputStream并提供了用于编写Binary Protocol Header(二进制协议标头)的方法,这是一种开始新行的方法(Binary Copy Protocol要求您为要行的每一行写出列数插入)和write方法,该方法需要IValueHandler<TTargetType>来编写给定的Java类型。
    PgBinaryWriter实现了AutoClosable,因为有必要在刷新和关闭流之前将-1写入流中。
    IValueHandler<TTargetType>接受DataOutputStream和一个值。它负责使用PostgreSQL二进制协议格式写入给定值。

    PgBinaryWriter

    // Copyright (c) Philipp Wagner. All rights reserved.
    // Licensed under the MIT license. See LICENSE file in the project root for full license information.
    
    package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql;
    
    
    import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
    import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers.IValueHandler;
    
    import java.io.BufferedOutputStream;
    import java.io.DataOutputStream;
    import java.io.OutputStream;
    
    public class PgBinaryWriter implements AutoCloseable {
    
        /** The ByteBuffer to write the output. */
        private transient DataOutputStream buffer;
    
        public PgBinaryWriter() {
        }
    
        public void open(final OutputStream out) {
            buffer = new DataOutputStream(new BufferedOutputStream(out));
    
            writeHeader();
        }
    
        private void writeHeader() {
            try {
    
                // 11 bytes required header
                buffer.writeBytes("PGCOPY\n\377\r\n\0");
                // 32 bit integer indicating no OID
                buffer.writeInt(0);
                // 32 bit header extension area length
                buffer.writeInt(0);
    
            } catch(Exception e) {
                throw new BinaryWriteFailedException(e);
            }
        }
    
        public void startRow(int numColumns) {
            try {
                buffer.writeShort(numColumns);
            } catch(Exception e) {
                throw new BinaryWriteFailedException(e);
            }
        }
    
        public <TTargetType> void write(final IValueHandler<TTargetType> handler, final TTargetType value) {
            handler.handle(buffer, value);
        }
    
        @Override
        public void close() {
            try {
                buffer.writeShort(-1);
    
                buffer.flush();
                buffer.close();
            } catch(Exception e) {
                throw new BinaryWriteFailedException(e);
            }
        }
    }
    

    值处理程序
    IValueHandler是一个简单的接口,它具有handle方法来获取DataOutputStream和一个值。

    // Copyright (c) Philipp Wagner. All rights reserved.
    // Licensed under the MIT license. See LICENSE file in the project root for full license information.
    
    package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;
    
    import java.io.DataOutputStream;
    import java.lang.reflect.Type;
    
    public interface IValueHandler<TTargetType> extends ValueHandler {
    
        void handle(DataOutputStream buffer, final TTargetType value);
    
        Type getTargetType();
    
    }
    

    了解协议很重要,必须在值为null时编写-1。为此,我编写了一个抽象的基类来处理这种情况。

    // Copyright (c) Philipp Wagner. All rights reserved.
    // Licensed under the MIT license. See LICENSE file in the project root for full license information.
    
    package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;
    
    import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
    
    import java.io.DataOutputStream;
    
    public abstract class BaseValueHandler<T> implements IValueHandler<T> {
    
        @Override
        public void handle(DataOutputStream buffer, final T value) {
            try {
                if (value == null) {
                    buffer.writeInt(-1);
                    return;
                }
                internalHandle(buffer, value);
            } catch (Exception e) {
                throw new BinaryWriteFailedException(e);
            }
        }
    
        protected abstract void internalHandle(DataOutputStream buffer, final T value) throws Exception;
    }
    

    然后可以实现各种Java类型的处理程序。这是long的示例。你可以找到
    GitHub存储库(handlers)中的其他实现。

    // Copyright (c) Philipp Wagner. All rights reserved.
    // Licensed under the MIT license. See LICENSE file in the project root for full license information.
    
    package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;
    
    import java.io.DataOutputStream;
    import java.lang.reflect.Type;
    
    public class LongValueHandler extends BaseValueHandler<Long> {
    
        @Override
        protected void internalHandle(DataOutputStream buffer, final Long value) throws Exception {
            buffer.writeInt(8);
            buffer.writeLong(value);
        }
    
        @Override
        public Type getTargetType() {
            return Long.class;
        }
    }
    

    使用PgBinaryWriter

    现在终于可以连接零件了。请注意,我已经抽象了更多部分。可能有必要在代码中查找更多实现细节。

    public abstract class PgBulkInsert<TEntity> {
    
        // ...
    
        public void saveAll(PGConnection connection, Stream<TEntity> entities) throws SQLException {
    
            CopyManager cpManager = connection.getCopyAPI();
            CopyIn copyIn = cpManager.copyIn(getCopyCommand());
    
            int columnCount = columns.size();
    
            try (PgBinaryWriter bw = new PgBinaryWriter()) {
    
                // Wrap the CopyOutputStream in our own Writer:
                bw.open(new PGCopyOutputStream(copyIn));
    
                // Insert all entities:
                entities.forEach(entity -> {
    
                    // Start a New Row:
                    bw.startRow(columnCount);
    
                    // Insert the Column Data:
                    columns.forEach(column -> {
                        try {
                            column.getWrite().invoke(bw, entity);
                        } catch (Exception e) {
                            throw new SaveEntityFailedException(e);
                        }
                    });
                });
            }
        }
    
        private String getCopyCommand()
        {
            String commaSeparatedColumns = columns.stream()
                    .map(x -> x.columnName)
                    .collect(Collectors.joining(", "));
    
            return String.format("COPY %1$s(%2$s) FROM STDIN BINARY",
                    table.GetFullQualifiedTableName(),
                    commaSeparatedColumns);
        }
    }
    

    PgBulkInsert

    PgBulkInsert支持以下PostgreSQL数据类型。
  • Numeric Types
  • smallint
  • 整数
  • bigint
  • 真正的
  • 双精度
  • Date/Time Types
  • 时间戳
  • 日期
  • Character Types
  • 文字
  • Boolean Type
  • 布尔
  • Binary Data Types
  • bytea
  • Network Address Types
  • inet(IPv4,IPv6)
  • UUID Type
  • uuid

  • 基本用法

    想象一下,应该将大量人员批量插入PostgreSQL数据库中。每个Person都有一个名字,一个姓氏和一个生日。

    数据库表

    PostgreSQL数据库中的表可能如下所示:

    CREATE TABLE sample.unit_test
    (
        first_name text,
        last_name text,
        birth_date date
    );
    

    领域模型

    应用程序中的域模型可能如下所示:

    private class Person {
    
        private String firstName;
    
        private String lastName;
    
        private LocalDate birthDate;
    
        public Person() {}
    
        public String getFirstName() {
            return firstName;
        }
    
        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    
        public String getLastName() {
            return lastName;
        }
    
        public void setLastName(String lastName) {
            this.lastName = lastName;
        }
    
        public LocalDate getBirthDate() {
            return birthDate;
        }
    
        public void setBirthDate(LocalDate birthDate) {
            this.birthDate = birthDate;
        }
    
    }
    

    批量插入器

    然后,您必须实现PgBulkInsert<Person>,它定义了表与域模型之间的映射。

    public class PersonBulkInserter extends PgBulkInsert<Person>
    {
        public PersonBulkInserter() {
            super("sample", "unit_test");
    
            MapString("first_name", Person::getFirstName);
            MapString("last_name", Person::getLastName);
            MapDate("birth_date", Person::getBirthDate);
        }
    }
    

    使用批量插入器

    最后,我们可以编写一个单元测试以将100000 Persons插入数据库。您可以在GitHub上找到整个单元测试:IntegrationTest.java

    @Test
    public void bulkInsertPersonDataTest() throws SQLException {
        // Create a large list of Persons:
        List<Person> persons = getPersonList(100000);
    
        // Create the BulkInserter:
        PersonBulkInserter personBulkInserter = new PersonBulkInserter();
    
        // Now save all entities of a given stream:
        personBulkInserter.saveAll(PostgreSqlUtils.getPGConnection(connection), persons.stream());
    
        // And assert all have been written to the database:
        Assert.assertEquals(100000, getRowCount());
    }
    
    private List<Person> getPersonList(int numPersons) {
        List<Person> persons = new ArrayList<>();
    
        for (int pos = 0; pos < numPersons; pos++) {
            Person p = new Person();
    
            p.setFirstName("Philipp");
            p.setLastName("Wagner");
            p.setBirthDate(LocalDate.of(1986, 5, 12));
    
            persons.add(p);
        }
    
        return persons;
    }
    

    07-24 09:46
    查看更多