我有一个加载数据并在时间中循环的功能,例如

def calculate_profit(account):
    account_data = load(account) #very expensive operation
    for day in account_data.days:
        print(account_data.get(day).profit)


因为数据的加载是昂贵的,所以使用joblib / multiprocessing进行如下操作很有意义:

arr = [account1, account2, account3, ...]
joblib.Parallel(n_jobs=-1)(delayed(calculate_profit)(arr))


但是,我想将另一个昂贵的函数应用于calculate_profit函数的中间结果。例如,假设将所有利润汇总并处理/将其发布到网站/等是一项昂贵的操作。我还需要前一天的利润来计算此函数中的利润变化。

def expensive_sum(prev_day_profits, *account_profits):
    total_profit_today = sum(account_profits)
    profit_difference = total_profit_today - prev_day_profits

    #some other expensive operation
    #more expensive operations


所以我想


并行运行多处理流程(以减轻所有昂贵的帐户数据中的加载负担)
一旦每个多处理过程都达到预定点(例如完成循环的一次迭代),则将这些中间值返回给另一个函数(expensive_sum)进行处理-假定每个单独的多处理过程在expensive_sum返回之前不能继续进行
但是,我想保持多处理进程的生命,这样就不必重新初始化它们(减少了开销)


有什么办法吗?

最佳答案

from multiprocessing import Manager
queue = manager.Queue()



  一旦每个多处理过程达到预定点
  做


queue.put(item)


同时另一个昂贵的功能是

queue.get(item)  ==>  blocking call for get


昂贵的函数在get上等待,并在获取值时继续处理,然后再次在get上等待

关于python - 多处理过程中间输出,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45195746/

10-11 10:24
查看更多