我有一个回测引擎,我尝试使用dask进行并行化。我可以独立处理每个星期的数据,因此我认为可以通过解雇一个可以创建回溯引擎的完整实例并在一周的数据上运行它的工作程序来轻松实现并行化。

这是我根据Dask Futures docs尝试的大纲代码。

from dask.distributed import Client
from backtest_engine import *

def run_backtest(start_date, end_date):
    engine = backtest_engine()
    engine.price_spread = 2
    engine.emulate_ticks = True
    engine.run_walk_forward(start_date, end_date, 'my_market', my_strategy_class)
    return True

if __name__ == "__main__":
    client = Client()
    a = client.submit(run_backtest, datetime(2017,9,3), datetime(2017,9,9))
    b = client.submit(run_backtest, datetime(2017,9,10), datetime(2017,9,17))


代码在通过以下方式调用client.submit()时失败:

_pickle.PicklingError: Could not pickle object as excessively deep recursion required.

代码是否存在问题,或者我的方法在某种程度上存在根本性缺陷,或者我可以在一个工人中进行的调用受到限制?

伊恩

对于背景:run_walk_forward()将HD5文件加载到Pandas数据帧中,在该数据帧上进行迭代以产生结果,然后将结果写入磁盘。

最佳答案

Dask使用cloudpickle进行功能序列化。我建议您对函数和cloudpickle.loads(cloudpickle.dumps(obj))的每个参数尝试使用client.submit

例如,可能是某些与您的功能有关的内容(例如backtest_engine)依赖于不容易序列化并在计算机之间发送的锁定或打开文件。

09-03 18:24