我在mysql数据库表中有多达500k的数据行。我必须使用一些查询来处理该数据,并将结果查询数据插入5个不同的表中。
我的代码段如下所示:
def jobsFunction(values):
unique_values = []
ref_value = {}
for value in values:
if value not in unique_values:
unique_values.append(value[0])
# some select queries with other tables
# from the result insert into table1
for query_vals in select_query:
ref_val[id] = some_val
# Insert into table2 with query_vals
# Update table3 with query_vals
# insert into table4 for each iteration with some process
# insert into table5 based on ref_val[id]
if __name__ == '__main__':
query = "SELECT roll_no, user_id, tenant_item_id FROM table_name"
cursor.execute(query)
vals = cursor.fetchall()
values = list(vals)
jobFunction(values)
问题是完成整个过程需要超过12个小时。
因此,我决定使用
multiprocessing.Pool
使用如下代码完成该过程:import multiprocessing as mp
def jobsFunction(values):
# jobs function code
if __name__ == '__main__':
# values fetching
lock = mp.Lock()
p = mp.Pool()
p.map(jobsFunction, values)
p.close()
p.join()
但是在这种情况下,从主函数到
jobsFunction
的数据流不按顺序排列。我的问题是:我是否使用正确的方法满足我的要求
以及如何使用多处理或多线程有效地达到要求?
最佳答案
从数据库中取出数据,然后回写很慢。尽量避免它。一些数字:如果每个查询仅花费100毫秒,那么执行它们将花费13个小时以上。
考虑使用这种设计:不要将所有数据传输到Python在那里进行处理,而要使用一系列查询或SQL查询来完成数据库中的所有操作。因此,与其将数据读入Python列表中,不如使用SQL查询
insert into table1 (...)
select ... from table_name
要么
update table1 out
set out.col1 = source.col2,
out.col2 = source.col3 ...
from table_name source
where out.pk = source.pk
and ...
数据库经过优化可以复制数据。这些查询将非常快速地运行,尤其是当您正确设置索引时。
考虑使用帮助器表来使查询更加简单或高效,因为您可以创建,截断它们,用数据填充它们,然后为您的案例创建完美的索引。
只在Python中做真正复杂的事情,并确保它只处理几行。