我在mysql数据库表中有多达500k的数据行。我必须使用一些查询来处理该数据,并将结果查询数据插入5个不同的表中。

我的代码段如下所示:

def jobsFunction(values):
    unique_values = []
    ref_value = {}
    for value in values:
        if value not in unique_values:
            unique_values.append(value[0])
            # some select queries with other tables
            # from the result insert into table1
            for query_vals in select_query:
                ref_val[id] = some_val
                # Insert into table2 with query_vals
                # Update table3 with query_vals
        # insert into table4 for each iteration with some process
        # insert into table5 based on ref_val[id]

if __name__ == '__main__':
    query = "SELECT roll_no, user_id, tenant_item_id FROM table_name"
    cursor.execute(query)
    vals = cursor.fetchall()
    values = list(vals)
    jobFunction(values)


问题是完成整个过程需要超过12个小时。
因此,我决定使用multiprocessing.Pool使用如下代码完成该过程:

import multiprocessing as mp

def jobsFunction(values):
    # jobs function code

if __name__ == '__main__':
    # values fetching
    lock = mp.Lock()
    p = mp.Pool()
    p.map(jobsFunction, values)
    p.close()
    p.join()


但是在这种情况下,从主函数到jobsFunction的数据流不按顺序排列。

我的问题是:我是否使用正确的方法满足我的要求
以及如何使用多处理或多线程有效地达到要求?

最佳答案

从数据库中取出数据,然后回写很慢。尽量避免它。一些数字:如果每个查询仅花费100毫秒,那么执行它们将花费13个小时以上。

考虑使用这种设计:不要将所有数据传输到Python在那里进行处理,而要使用一系列查询或SQL查询来完成数据库中的所有操作。因此,与其将数据读入Python列表中,不如使用SQL查询

insert into table1 (...)
select ... from table_name


要么

update table1 out
set out.col1 = source.col2,
    out.col2 = source.col3 ...
from table_name source
where out.pk = source.pk
  and ...


数据库经过优化可以复制数据。这些查询将非常快速地运行,尤其是当您正确设置索引时。

考虑使用帮助器表来使查询更加简单或高效,因为您可以创建,截断它们,用数据填充它们,然后为您的案例创建完美的索引。

只在Python中做真正复杂的事情,并确保它只处理几行。

09-11 18:59
查看更多