我正在尝试使用multiprocessing.Pool对象并行运行一些数据库查询。我正在使用MySQLdb。

我有一些模块级函数,可在其中定义要运行的查询,如下所示:

def check_foo(cursor, table):
    query = "(some query)"
    cursor.execute(query)
    results = cursor.fetchall()
    return len(results) == 0


这些函数在程序运行时收集,如下所示:

if __name__ == '__main__':
    check_functions = [v for k, v in globals().items()
                             if k.startswith('check_') and callable(v)]


我还有一个模块级函数,该函数在表列表上运行特定的检查函数:

def run_check_on_all((tables, cursor, f)):
    return [f(cursor, table) for table in tables]


我希望每个检查函数都有一个工作进程,该函数将为此函数调用run_check_on_all。这是我的尝试:

if __name__ == '__main__':
    ...

    pool = multiprocessing.Pool(len(check_functions))
    cursors = [conn.cursor() for i in range(len(check_functions))]

    print "Running {0} check(s)...".format(len(check_functions))
    table_lists = [table_list] * len(check_functions)
    all_results = pool.map(run_check_on_all, zip(table_lists, cursors, check_functions))


当我尝试运行此命令时,出现以下错误:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/local/Python2.6/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/local/Python2.6/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/Python2.6/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed


如您所见(希望如此),调用pool.map所涉及的都不是实例方法。 run_check_on_all和每个check_functions是模块级功能。 table_lists是字符串列表的列表。 cursors是MySQLdb游标对象的列表。

我以为这可能与在check函数中调用游标对象的实例方法有关,但是我将其替换为这样的虚拟函数

def check_foo(cursor, table):
    print "hello"


仍然没有运气。

错误所指的实例方法在哪里?

最佳答案

问题是您尝试在进程之间传递数据库游标对象。每个进程都必须创建与数据库的连接,并在该连接上创建一个游标。

关于python - 无法使用Pool.map() pickle 实例方法,但是我没有实例方法,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/17093258/

10-10 16:13