问题描述
我正在尝试使用 pyodbc 将 Pandas 数据帧更新插入到 MS SQL Server.我以前使用过类似的方法来进行直线插入,但这次我尝试的解决方案非常慢.有没有比我所拥有的更简化的方法来完成 upsert?
I'm trying to upsert a pandas dataframe to a MS SQL Server using pyodbc. I've used a similar approach before to do straight inserts, but the solution I've tried this time is incredibly slow. Is there a more streamlined way to accomplish an upsert than what I have?
sql_connect = pyodbc.connect('Driver={SQL Server Native Client 11.0}; Server=blank1; Database=blank2; UID=blank3; PWD=blank4')
cursor = sql_connect.cursor()
for index, row in bdf.iterrows():
res = cursor.execute("UPDATE dbo.MPA_BOOK_RAW SET [SITE]=?, [SHIP_TO]=?, [PROD_LINE]=?, [GROUP_NUMBER]=?, [DESCRIPTION]=?, [ORDER_QTY]=?, [BPS_INCLUDE]=? WHERE [CUST]=? AND [ORDER_NUMBER]=? AND [ORDER_DATE]=? AND [PURCHASE_ORDER]=? AND [CHANNEL]=? AND [ITEM]=? AND [END_DT]=?",
row['SITE'],
row['SHIP_TO'],
row['PROD_LINE'],
row['GROUP_NUMBER'],
row['DESCRIPTION'],
row['ORDER_QTY'],
row['BPS_INCLUDE'],
row['CUST'],
row['ORDER_NUMBER'],
row['ORDER_DATE'],
row['PURCHASE_ORDER'],
row['CHANNEL'],
row['ITEM'],
row['END_DT'])
if res.rowcount == 0:
cursor.execute("INSERT INTO dbo.MPA_BOOK_RAW ([SITE], [CUST], [ORDER_NUMBER], [ORDER_DATE], [PURCHASE_ORDER], [CHANNEL], [SHIP_TO], [PROD_LINE], [GROUP_NUMBER], [DESCRIPTION], [ITEM], [ORDER_QTY], [END_DT], [BPS_INCLUDE]) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
row['SITE'],
row['CUST'],
row['ORDER_NUMBER'],
row['ORDER_DATE'],
row['PURCHASE_ORDER'],
row['CHANNEL'],
row['SHIP_TO'],
row['PROD_LINE'],
row['GROUP_NUMBER'],
row['DESCRIPTION'],
row['ITEM'],
row['ORDER_QTY'],
row['END_DT'],
row['BPS_INCLUDE'])
sql_connect.commit()
cursor.close()
sql_connect.close()
我使用原始 ~50k 行数据帧的五行样本尝试了上述方法,并且效果很好.所以逻辑似乎没问题.只是速度是个问题.
I tried the above with a five row sample of my original ~50k row dataframe and it worked fine. So the logic seems okay. It's just the speed that is an issue.
推荐答案
对该问题的评论建议将 DataFrame 上传到临时表,然后将内容合并到主表中.但是请注意,T-SQL MERGE 语句的文档 说:
Comments to the question suggest uploading the DataFrame to a temporary table and then merging the contents into the main table. Note, however, that the documentation for the T-SQL MERGE statement says:
性能提示:当两个表具有复杂的匹配特征混合时,为 MERGE 语句描述的条件行为最有效.例如,如果一行不存在则插入,或者如果匹配则更新该行.当简单地根据另一个表的行更新一个表时,可以使用基本的 INSERT、UPDATE 和 DELETE 语句来提高性能和可伸缩性.
在您的情况下,匹配条件相对简单-实际上是多列主键-因此您可以简单地使用带有 UPDATE 语句和 INSERT 语句的匿名代码块,如下面的简化 MCVE 代码.
In your case the matching criteria are relatively straightforward - just what is effectively a multi-column primary key - so you could simply use an anonymous code block with an UPDATE statement and an INSERT statement as in the following simplified MCVE code.
最低要求:
- Python 3.6+ 用于
f'...'
字符串格式 - SQLAlchemy 1.3 用于
fast_executemany
参数到create_engine
DRIVER=ODBC Driver 17 for SQL Server;
和UseFMTONLY=Yes;
用于可靠的fast_executemany
INSERTs 到 SQL Server #temporary 表
- Python 3.6+ for
f'...'
string formatting - SQLAlchemy 1.3 for the
fast_executemany
argument tocreate_engine
DRIVER=ODBC Driver 17 for SQL Server;
andUseFMTONLY=Yes;
for reliablefast_executemany
INSERTs to a SQL Server #temporary table
from pprint import pprint
import sys
import urllib
import pandas as pd
import pyodbc
import sqlalchemy as sa
print(sys.version)
# 3.7.5 (tags/v3.7.5:5c02a39a0b, Oct 15 2019, 00:11:34) [MSC v.1916 64 bit (AMD64)]
print(
f"SQLAlchemy {sa.__version__}, pandas {pd.__version__}, pyodbc {pyodbc.version}"
)
# SQLAlchemy 1.3.19, pandas 1.1.2, pyodbc 4.0.30
connection_string = (
r"DRIVER=ODBC Driver 17 for SQL Server;"
r"SERVER=(local)\SQLEXPRESS;"
r"DATABASE=myDb;"
r"Trusted_Connection=Yes;"
r"UseFMTONLY=Yes;"
)
sqlalchemy_url = "mssql+pyodbc:///?odbc_connect=" + urllib.parse.quote_plus(
connection_string
)
engine = sa.create_engine(sqlalchemy_url, fast_executemany=True)
with engine.begin() as conn:
# set up test environment
conn.execute(sa.text("DROP TABLE IF EXISTS actual_table;"))
conn.execute(
sa.text(
"""\
CREATE TABLE actual_table (
institution_no VARCHAR(3),
transit_no VARCHAR(5),
branch_name VARCHAR(50),
CONSTRAINT PK_actual_table PRIMARY KEY CLUSTERED
(institution_no, transit_no));
"""
)
)
# actual_table initial state
conn.execute(
sa.text(
"""\
INSERT INTO actual_table (institution_no, transit_no, branch_name) VALUES
('002', '45678', 'Scotiabank branch #45678 - *** UPDATE NEEDED ***'),
('003', '67890', 'RBC branch #67890 - Sudbury, ON');
"""
)
)
# test data to be updated or inserted
update_columns = ["institution_no", "transit_no", "branch_name"]
update_data = [
["004", "12345", "TD branch #12345 - London, ON"],
["002", "45678", "Scotiabank branch #45678 - Timmins, ON"],
["004", "34567", "TD branch #34567 - Toronto, ON"],
]
df_update = pd.DataFrame(update_data, columns=update_columns)
# Here's where the real work begins ...
#
# Step 1: upload update data
df_update.to_sql("#update_table", conn, index=False)
#
# Step 2: perform the "upsert"
sql = """\
SET NOCOUNT ON;
DECLARE @rows_updated INT = 0;
DECLARE @rows_inserted INT = 0;
UPDATE a SET a.branch_name = u.branch_name
FROM actual_table a INNER JOIN #update_table u
ON a.institution_no = u.institution_no
AND a.transit_no = u.transit_no;
SELECT @rows_updated = @@ROWCOUNT;
INSERT INTO actual_table (institution_no, transit_no, branch_name)
SELECT institution_no, transit_no, branch_name
FROM #update_table u
WHERE NOT EXISTS (
SELECT * FROM actual_table
WHERE institution_no = u.institution_no
AND transit_no = u.transit_no
);
SELECT @rows_inserted = @@ROWCOUNT;
SELECT @rows_updated AS rows_updated, @rows_inserted AS rows_inserted;
"""
result = conn.execute(sa.text(sql)).fetchone()
print(f"{result[0]} row(s) updated, {result[1]} row(s) inserted")
# 1 row(s) updated, 2 row(s) inserted
# verify results
with engine.begin() as conn:
pprint(conn.execute(sa.text("SELECT * FROM actual_table")).fetchall())
"""console output:
[('002', '45678', 'Scotiabank branch #45678 - Timmins, ON'),
('003', '67890', 'RBC branch #67890 - Sudbury, ON'),
('004', '12345', 'TD branch #12345 - London, ON'),
('004', '34567', 'TD branch #34567 - Toronto, ON')]
"""
这篇关于使用 pyodbc 将 Pandas 数据帧高效更新插入 MS SQL Server的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!