嗨,我正在使用PythonOperator运行任务。看来该任务实际上能够正常运行,并且返回的值符合我的预期(这是API调用的大型XML输出)。但是,我得到一个ERROR - (builtins.RecursionError) maximum recursion depth exceeded in comparison
。我的python可调用函数正在返回一个值,因此我假设存在XCOM推送,并且它正在尝试序列化要由后续运算符提取的输出。但是我不确定确切的解决方法,因为我没有看到以下配置:a)增加Pickle序列化程序的递归深度(建议here)或2)XCOM推送期间的任何错误处理
我的完整踪迹如下
INFO - Subtask: [2017-11-08 14:00:14,545] {models.py:1342} INFO - Executing <Task(PythonOperator): test_task_xml> on 2017-11-07 00:00:00
INFO - Subtask: [2017-11-08 14:00:31,817] {python_operator.py:81} INFO - Done. Returned value was: <QueryResult><Query><Answer>12345</Answer> ... (12321456 characters truncated) ... </Query></QueryResult>
INFO - Subtask: [2017-11-08 14:00:31,839] {models.py:1417} ERROR - (builtins.RecursionError) maximum recursion depth exceeded in comparison [SQL: 'INSERT INTO xcom (key, value, timestamp, execution_date, task_id, dag_id) VALUES (%(key)s, %(value)s, now(), %(execution_date)s, %(task_id)s, %(dag_id)s) RETURNING xcom.id'] [parameters: [{'dag_id': 'test_dag', 'key': 'return_value', 'value': <QueryResult><Query><Answer>12345</Answer> ... (12321456 characters truncated) ... </Query></QueryResult>, 'task_id': 'test_task_xml', 'execution_date': datetime.datetime(2017, 11, 7, 0, 0)}]]
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask: File "/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1116, in _execute_context
INFO - Subtask: context = constructor(dialect, self, conn, *args)
INFO - Subtask: File "/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 690, in _init_compiled
INFO - Subtask: for key in compiled_params
INFO - Subtask: File "/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 690, in <genexpr>
INFO - Subtask: for key in compiled_params
INFO - Subtask: File "/lib/python3.6/site-packages/sqlalchemy/sql/sqltypes.py", line 1516, in process
INFO - Subtask: value = dumps(value, protocol)
INFO - Subtask: dump(obj, file, protocol, byref, fmode, recurse)#, strictio)
INFO - Subtask: File "/lib/python3.6/site-packages/dill/dill.py", line 274, in dump
[INFO - Subtask: pik.dump(obj)
INFO - Subtask: File "/lib/python3.6/pickle.py", line 409, in dump
INFO - Subtask: self.save(obj)
INFO - Subtask: File "/lib/python3.6/pickle.py", line 521, in save
INFO - Subtask: self.save_reduce(obj=obj, *rv)
INFO - Subtask: File "/lib/python3.6/pickle.py", line 634, in save_reduce
INFO - Subtask: save(state)
INFO - Subtask: File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask: f(self, obj) # Call unbound method with explicit self
INFO - Subtask: File "/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
INFO - Subtask: StockPickler.save_dict(pickler, obj)
INFO - Subtask: File "/lib/python3.6/pickle.py", line 821, in save_dict
INFO - Subtask: self._batch_setitems(obj.items())
INFO - Subtask: File "/lib/python3.6/pickle.py", line 847, in _batch_setitems
INFO - Subtask: save(v)
INFO - Subtask: File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask: f(self, obj) # Call unbound method with explicit self
INFO - Subtask: File "/lib/python3.6/pickle.py", line 781, in save_list
INFO - Subtask: self._batch_appends(obj)
INFO - Subtask: File "/lib/python3.6/pickle.py", line 805, in _batch_appends
INFO - Subtask: save(x)
INFO - Subtask: File "/lib/python3.6/pickle.py", line 521, in save
INFO - Subtask: self.save_reduce(obj=obj, *rv)
INFO - Subtask: File "/lib/python3.6/pickle.py", line 634, in save_reduce
INFO - Subtask: save(state)
INFO - Subtask: File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask: f(self, obj) # Call unbound method with explicit self
INFO - Subtask: File "/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
最佳答案
发现由数据库的BLOB
或BINARY LARGE OBJECT
设置的腌制Python对象存在限制。为了解决这个问题,您可以
尝试使用Fileflow
将文件转储到临时文件夹中,并通过XCOM功能推送文件路径
在单个任务中处理整个过程,并通过XCOM推送某些值