我正在使用python脚本在postgresql中插入或更新大约3到4百万个数据。请看下面的代码。要求是插入,如果它的新的密钥或更新键与新的值,如果键已经存在。但下面的代码与数据库的往返连接太多,在数据库中插入300万条记录大约需要35-45分钟,这非常缓慢。如何避免往返连接并以更快的方式插入或更新?
任何帮助都将不胜感激。
提前谢谢你的帮助。
InputFile.txt-此文件有大约300万到400万行itesm

productKey1 printer1,printerModel1,printerPrice1,printerDesc1|
productKey2 sacnner2,scannerModel2,scannerPrice2,scannerDesc2|
productKey3 mobile3,mobileModel3,mobilePrice3,mobileDesc3|
productKey4 tv4,tvModel4,tvPrice4,tvDescription4|
productKey2 sacnner22,scannerModel22,scannerPrice22,scannerDesc22|

插入.py
def insertProduct(filename, conn):
   seen = set()
   cursor = conn.cursor()
   qi = "INSERT INTO productTable (key, value) VALUES (%s, %s);"
   qu = "UPDATE productTable SET value = CONCAT(value, %s) WHERE key = %s;"

   with open(filename) as f:
     for line in f:
       if line.strip():
         key, value = line.split(' ', 1)
         if key not in seen:
            seen.add(key)
            cursor.execute(qi, (key, value))
         else:
            cursor.execute(qu, (value, key))

         conn.commit()

conn = psycopg2.connect("dbname='productDB' user='myuser' host='localhost'")
insertProduct('InputFile.txt', conn)

最佳答案

执行一批批准备好的语句。http://initd.org/psycopg/docs/extras.html#fast-execution-helpers

import psycopg2, psycopg2.extras
def insertProduct(filename, conn):

    data = []
    with open(filename) as f:
        for line in f:
            line = line.strip()
            if line:
                key, value = line.split(' ', 1)
                data.append((key, value))

    cursor = conn.cursor()
    cursor.execute("""
        prepare upsert (text, text) as
        with i as (
            insert into productTable (key, value)
            select $1, $2
            where not exists (select 1 from productTable where key = $1)
            returning *
        )
        update productTable p
        set value = concat (p.value, $2)
        where p.key = $1 and not exists (select 1 from i)
    """)
    psycopg2.extras.execute_batch(cursor, "execute upsert (%s, %s)", data, page_size=500)
    cursor.execute("deallocate upsert")
    conn.commit()

conn = psycopg2.connect(database='cpn')
insertProduct('InputFile.txt', conn)

09-26 06:15