数据帧高效更新插入

数据帧高效更新插入

本文介绍了使用 pyodbc 将 Pandas 数据帧高效更新插入 MS SQL Server的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 pyodbc 将 Pandas 数据帧更新插入到 MS SQL Server.我以前使用过类似的方法来进行直线插入,但这次我尝试的解决方案非常慢.有没有比我所拥有的更简化的方法来完成 upsert?

I'm trying to upsert a pandas dataframe to a MS SQL Server using pyodbc. I've used a similar approach before to do straight inserts, but the solution I've tried this time is incredibly slow. Is there a more streamlined way to accomplish an upsert than what I have?

sql_connect = pyodbc.connect('Driver={SQL Server Native Client 11.0}; Server=blank1; Database=blank2; UID=blank3; PWD=blank4')
cursor = sql_connect.cursor()

for index, row in bdf.iterrows():
    res = cursor.execute("UPDATE dbo.MPA_BOOK_RAW SET [SITE]=?, [SHIP_TO]=?, [PROD_LINE]=?, [GROUP_NUMBER]=?, [DESCRIPTION]=?, [ORDER_QTY]=?, [BPS_INCLUDE]=? WHERE [CUST]=? AND [ORDER_NUMBER]=? AND [ORDER_DATE]=? AND [PURCHASE_ORDER]=? AND [CHANNEL]=? AND [ITEM]=? AND [END_DT]=?",
                    row['SITE'],
                    row['SHIP_TO'],
                    row['PROD_LINE'],
                    row['GROUP_NUMBER'],
                    row['DESCRIPTION'],
                    row['ORDER_QTY'],
                    row['BPS_INCLUDE'],
                    row['CUST'],
                    row['ORDER_NUMBER'],
                    row['ORDER_DATE'],
                    row['PURCHASE_ORDER'],
                    row['CHANNEL'],
                    row['ITEM'],
                    row['END_DT'])

    if res.rowcount == 0:
            cursor.execute("INSERT INTO dbo.MPA_BOOK_RAW ([SITE], [CUST], [ORDER_NUMBER], [ORDER_DATE], [PURCHASE_ORDER], [CHANNEL], [SHIP_TO], [PROD_LINE], [GROUP_NUMBER], [DESCRIPTION], [ITEM], [ORDER_QTY], [END_DT], [BPS_INCLUDE]) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
                    row['SITE'],
                    row['CUST'],
                    row['ORDER_NUMBER'],
                    row['ORDER_DATE'],
                    row['PURCHASE_ORDER'],
                    row['CHANNEL'],
                    row['SHIP_TO'],
                    row['PROD_LINE'],
                    row['GROUP_NUMBER'],
                    row['DESCRIPTION'],
                    row['ITEM'],
                    row['ORDER_QTY'],
                    row['END_DT'],
                    row['BPS_INCLUDE'])

    sql_connect.commit()

cursor.close()
sql_connect.close()

我使用原始 ~50k 行数据帧的五行样本尝试了上述方法,并且效果很好.所以逻辑似乎没问题.只是速度是个问题.

I tried the above with a five row sample of my original ~50k row dataframe and it worked fine. So the logic seems okay. It's just the speed that is an issue.

推荐答案

对该问题的评论建议将 DataFrame 上传到临时表,然后将内容合并到主表中.但是请注意,T-SQL MERGE 语句的文档 说:

Comments to the question suggest uploading the DataFrame to a temporary table and then merging the contents into the main table. Note, however, that the documentation for the T-SQL MERGE statement says:

性能提示:当两个表具有复杂的匹配特征混合时,为 MERGE 语句描述的条件行为最有效.例如,如果一行不存在则插入,或者如果匹配则更新该行.当简单地根据另一个表的行更新一个表时,可以使用基本的 INSERT、UPDATE 和 DELETE 语句来提高性能和可伸缩性.

在您的情况下,匹配条件相对简单-实际上是多列主键-因此您可以简单地使用带有 UPDATE 语句和 INSERT 语句的匿名代码块,如下面的简化 MCVE 代码.

In your case the matching criteria are relatively straightforward - just what is effectively a multi-column primary key - so you could simply use an anonymous code block with an UPDATE statement and an INSERT statement as in the following simplified MCVE code.

最低要求:

  • Python 3.6+ 用于 f'...' 字符串格式
  • SQLAlchemy 1.3 用于 fast_executemany 参数到 create_engine
  • DRIVER=ODBC Driver 17 for SQL Server;UseFMTONLY=Yes; 用于可靠的 fast_executemany INSERTs 到 SQL Server #temporary 表
  • Python 3.6+ for f'...' string formatting
  • SQLAlchemy 1.3 for the fast_executemany argument to create_engine
  • DRIVER=ODBC Driver 17 for SQL Server; and UseFMTONLY=Yes; for reliable fast_executemany INSERTs to a SQL Server #temporary table
from pprint import pprint
import sys
import urllib

import pandas as pd
import pyodbc
import sqlalchemy as sa

print(sys.version)
# 3.7.5 (tags/v3.7.5:5c02a39a0b, Oct 15 2019, 00:11:34) [MSC v.1916 64 bit (AMD64)]
print(
    f"SQLAlchemy {sa.__version__}, pandas {pd.__version__}, pyodbc {pyodbc.version}"
)
# SQLAlchemy 1.3.19, pandas 1.1.2, pyodbc 4.0.30

connection_string = (
    r"DRIVER=ODBC Driver 17 for SQL Server;"
    r"SERVER=(local)\SQLEXPRESS;"
    r"DATABASE=myDb;"
    r"Trusted_Connection=Yes;"
    r"UseFMTONLY=Yes;"
)
sqlalchemy_url = "mssql+pyodbc:///?odbc_connect=" + urllib.parse.quote_plus(
    connection_string
)
engine = sa.create_engine(sqlalchemy_url, fast_executemany=True)

with engine.begin() as conn:
    # set up test environment
    conn.execute(sa.text("DROP TABLE IF EXISTS actual_table;"))
    conn.execute(
        sa.text(
            """\
            CREATE TABLE actual_table (
                institution_no VARCHAR(3),
                transit_no VARCHAR(5),
                branch_name VARCHAR(50),
                CONSTRAINT PK_actual_table PRIMARY KEY CLUSTERED
                    (institution_no, transit_no));
            """
        )
    )
    # actual_table initial state
    conn.execute(
        sa.text(
            """\
            INSERT INTO actual_table (institution_no, transit_no, branch_name) VALUES
                ('002', '45678', 'Scotiabank branch #45678 - *** UPDATE NEEDED ***'),
                ('003', '67890', 'RBC branch #67890 - Sudbury, ON');
            """
        )
    )
    # test data to be updated or inserted
    update_columns = ["institution_no", "transit_no", "branch_name"]
    update_data = [
        ["004", "12345", "TD branch #12345 - London, ON"],
        ["002", "45678", "Scotiabank branch #45678 - Timmins, ON"],
        ["004", "34567", "TD branch #34567 - Toronto, ON"],
    ]
    df_update = pd.DataFrame(update_data, columns=update_columns)

    # Here's where the real work begins ...
    #
    # Step 1: upload update data
    df_update.to_sql("#update_table", conn, index=False)
    #
    # Step 2: perform the "upsert"
    sql = """\
    SET NOCOUNT ON;
    DECLARE @rows_updated INT = 0;
    DECLARE @rows_inserted INT = 0;

    UPDATE a SET a.branch_name = u.branch_name
        FROM actual_table a INNER JOIN #update_table u
            ON a.institution_no = u.institution_no
                AND a.transit_no = u.transit_no;
    SELECT @rows_updated = @@ROWCOUNT;

    INSERT INTO actual_table (institution_no, transit_no, branch_name)
        SELECT institution_no, transit_no, branch_name
        FROM #update_table u
        WHERE NOT EXISTS (
            SELECT * FROM actual_table
            WHERE institution_no = u.institution_no
                AND transit_no = u.transit_no
        );
    SELECT @rows_inserted = @@ROWCOUNT;

    SELECT @rows_updated AS rows_updated, @rows_inserted AS rows_inserted;
    """
    result = conn.execute(sa.text(sql)).fetchone()
    print(f"{result[0]} row(s) updated, {result[1]} row(s) inserted")
    # 1 row(s) updated, 2 row(s) inserted

# verify results
with engine.begin() as conn:
    pprint(conn.execute(sa.text("SELECT * FROM actual_table")).fetchall())
    """console output:
    [('002', '45678', 'Scotiabank branch #45678 - Timmins, ON'),
     ('003', '67890', 'RBC branch #67890 - Sudbury, ON'),
     ('004', '12345', 'TD branch #12345 - London, ON'),
     ('004', '34567', 'TD branch #34567 - Toronto, ON')]
    """

这篇关于使用 pyodbc 将 Pandas 数据帧高效更新插入 MS SQL Server的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-13 21:06