本文介绍了如何在日志中查看 MySqlHook 结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 MySqlHookairflow_db 建立连接,我正在执行一些查询,但我需要在某处查看查询结果(比如日志),我怎么看?

这是示例代码

I am using MySqlHook to establish connection from airflow_db, and I am performing some query, but I need to see the result of the query somewhere ( let say log), how can I see?

Here is the sample code

t1 = MySqlOperator(
    task_id='basic_mysql',
    mysql_conn_id='airflow_db',
    sql="select * from xcom",
    dag=dag)

推荐答案

MySQL 操作符当前(在撰写本文时为 airflow 1.10.1)不支持在 XCom 中返回任何内容,因此目前对您的修复是自己写一个小算子.您可以直接在 DAG 文件中执行此操作:

The MySQL operator currently (airflow 1.10.1 at time of writing) doesn't support returning anything in XCom, so the fix for you, for now, is to write a small operator yourself. You can do this directly in your DAG file:

from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.hooks.mysql_hook import MySqlHook

class ReturningMySqlOperator(MySqlOperator):
    def execute(self, context):
        self.log.info('Executing: %s', self.sql)
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                         schema=self.database)
        return hook.get_records(
            self.sql,
            parameters=self.parameters)

t1 = ReturningMySqlOperator(
    task_id='basic_mysql',
    mysql_conn_id='airflow_db',
    sql="select * from xcom",
    dag=dag)

def get_records(**kwargs):
    ti = kwargs['ti']
    xcom = ti.xcom_pull(task_ids='basic_mysql')
    string_to_print = 'Value in xcom is: {}'.format(xcom)
    # Get data in your logs
    logging.info(string_to_print)

t2 = PythonOperator(
    task_id='records',
    provide_context=True,
    python_callable=get_records,
    dag=dag)

t1 >> t2

这篇关于如何在日志中查看 MySqlHook 结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-01 21:48