问题描述
我是python的新手,我具有为我的数据计算功能,然后返回应处理并写入文件的列表的功能...我正在使用Pool进行计算,然后使用要写入文件的回调函数,但是没有调用该回调函数,我已经在其中添加了一些print语句,但绝对没有调用它.我的代码如下:
I am a newbie to python,i am have function that calculate feature for my data and then return a list that should be processed and written in file.,..i am using Pool to do the calculation and then and use the callback function to write into file,however the callback function is not being call,i ve put some print statement in it but it is definetly not being called.my code looks like this:
def write_arrow_format(results):
print("writer called")
results[1].to_csv("../data/model_data/feature-"+results[2],sep='\t',encoding='utf-8')
with open('../data/model_data/arow-'+results[2],'w') as f:
for dic in results[0]:
feature_list=[]
print(dic)
beginLine=True
for key,value in dic.items():
if(beginLine):
feature_list.append(str(value))
beginLine=False
else:
feature_list.append(str(key)+":"+str(value))
feature_line=" ".join(feature_list)
f.write(feature_line+"\n")
def generate_features(users,impressions,interactions,items,filename):
#some processing
return [result1,result2,filename]
if __name__=="__main__":
pool=mp.Pool(mp.cpu_count()-1)
for i in range(interval):
if i==interval:
pool.apply_async(generate_features,(users[begin:],impressions,interactions,items,str(i)),callback=write_arrow_format)
else:
pool.apply_async(generate_features,(users[begin:begin+interval],impressions,interactions,items,str(i)),callback=write_arrow_format)
begin=begin+interval
pool.close()
pool.join()
推荐答案
在您的帖子中,generate_features
返回的列表中包含的内容并不明显.但是,如果result1
,result2
或filename
中的任何一个都不可序列化,则由于某种原因,多处理库将不会调用回调函数,并且将无法静默地执行此操作.我认为这是因为多处理库尝试在子进程和父进程之间来回传递对象之前对其进行酸洗.如果您返回的任何内容都不是可修复的"(即不可序列化),则不会调用该回调.
It's not obvious from your post what is contained in the list returned by generate_features
. However, if any of result1
, result2
, or filename
are not serializable, then for some reason the multiprocessing lib will not call the callback function and will fail to do so silently. I think this is because the multiprocessing lib attempts to pickle objects before passing them back and forth between child processes and the parent process. If anything you're returning isn't "pickleable" (i.e not serializable) then the callback doesn't get called.
我自己遇到了这个错误,事实证明这是一个给我带来麻烦的logger对象的实例.这里是一些示例代码来重现我的问题:
I've encountered this bug myself, and it turned out to be an instance of a logger object that was giving me troubles. Here is some sample code to reproduce my issue:
import multiprocessing as mp
import logging
def bad_test_func(ii):
print('Calling bad function with arg %i'%ii)
name = "file_%i.log"%ii
logging.basicConfig(filename=name,level=logging.DEBUG)
if ii < 4:
log = logging.getLogger()
else:
log = "Test log %i"%ii
return log
def good_test_func(ii):
print('Calling good function with arg %i'%ii)
instance = ('hello', 'world', ii)
return instance
def pool_test(func):
def callback(item):
print('This is the callback')
print('I have been given the following item: ')
print(item)
num_processes = 3
pool = mp.Pool(processes = num_processes)
results = []
for i in range(5):
res = pool.apply_async(func, (i,), callback=callback)
results.append(res)
pool.close()
pool.join()
def main():
print('#'*30)
print('Calling pool test with bad function')
print('#'*30)
pool_test(bad_test_func)
print('#'*30)
print('Calling pool test with good function')
print('#'*30)
pool_test(good_test_func)
if __name__ == '__main__':
main()
希望这会有所帮助,并为您指明正确的方向.
Hopefully this helpful and points you in the right direction.
这篇关于未调用apply_async回调函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!