我正在尝试通过 XCom 将字符串列表从一个任务传递到另一个任务,但我似乎无法将推送列表解释为列表。

例如,当我在 blah 中运行的某个函数 ShortCircuitOperator 中执行此操作时:

paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)

然后我想使用这样的列表作为运算符的参数。例如,
run_task_after_blah = AfterBlahOperator(
    task_id='run-task-after-blah',
    ...,
    input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
    ...,
)

我希望 input_paths 等于 paths 但它不是因为渲染首先发生然后分配,并且模板渲染在某种程度上将 xcom_pull 返回转换为字符串化列表(然后我的 AfterBlahOperator 插入将其分配为元素的值JSON。

我尝试将 paths 连接成一个由某个分隔符分隔的字符串,然后将其推送到 XCom,然后在从 XCom 拉出时将其拆分回来,但由于 XCom 首先呈现,我得到,当 split 函数在内部调用时的字符串化列表如果 paths 函数应用于参数(如在 split 中),则为模板或 "{{ ti.xcom_pull(task_ids='find-paths') }}".split(';') 的原始连接字符串。

当提取的值可以进一步处理时,XCom 似乎非常适合作为任务参数的单个值或多个值,但不能将 multiple_values 转换为“一个”作为任务的参数。

有没有办法做到这一点而不必编写一个额外的函数来精确返回这样的字符串列表?
或者也许我滥用 XCom 太多了,但是 Airflow 中有许多操作符将元素列表作为参数(例如,通常是多个文件的完整路径,这些文件是先前任务的结果,因此事先不知道)。

最佳答案

Jinja 渲染字符串,所以如果你通过模板获取 XCom,它总是一个字符串。相反,您将需要获取您有权访问 TaskInstance 对象的 XCom。像这样的东西:

class AfterBlahOperator(BaseOperator):

    def __init__(self, ..., input_task_id, *args, **kwargs):
        ...
        self.input_task_id = input_task_id
        super(AfterBlahOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id)
        for path in input_paths:
            ...

这类似于在 PythonOperator 中获取它的方式,XCom docs 提供了一个示例。

请注意,当它可以在 DAG 中进行硬编码时,您仍然可以支持单独的 input_paths 参数,您只需要额外检查以查看从哪个参数读取值。

关于python - 将字符串列表作为 Airflow 中依赖任务的参数传递,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47052582/

10-10 12:37