本文介绍了(Django)气流中的ORM-有可能吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在Airflow任务中使用Django模型?

How to work with Django models inside Airflow tasks?

根据Airflow官方文档,Airflow提供了用于与数据库进行交互的挂钩(例如MySqlHook/PostgresHook/等),这些挂钩以后可在Operators中用于执行行查询.附加核心代码片段:

According to official Airflow documentation, Airflow provides hooks for interaction with databases (like MySqlHook / PostgresHook / etc) that can be later used in Operators for row query execution. Attaching the core code fragments:

class MySqlHook(DbApiHook):
    conn_name_attr = 'mysql_conn_id'
    default_conn_name = 'mysql_default'
    supports_autocommit = True

    def get_conn(self):
        """
        Returns a mysql connection object
        """
        conn = self.get_connection(self.mysql_conn_id)
        conn_config = {
            "user": conn.login,
            "passwd": conn.password or ''
        }
        conn_config["host"] = conn.host or 'localhost'
        conn_config["db"] = conn.schema or ''
        conn = MySQLdb.connect(**conn_config)
        return conn

https://airflow.apache.org/_modules/mysql_operator.html

class MySqlOperator(BaseOperator):
    @apply_defaults
    def __init__(
            self, sql, mysql_conn_id='mysql_default', parameters=None,
            autocommit=False, *args, **kwargs):
        super(MySqlOperator, self).__init__(*args, **kwargs)
        self.mysql_conn_id = mysql_conn_id
        self.sql = sql
        self.autocommit = autocommit
        self.parameters = parameters

    def execute(self, context):
        logging.info('Executing: ' + str(self.sql))
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
        hook.run(
            self.sql,
            autocommit=self.autocommit,
            parameters=self.parameters)

我们可以看到Hook封装了连接配置,而Operator提供了执行自定义查询的功能.

As we can see Hook incapsulates the connection configuration while Operator provides ability to execute custom queries.

出于以下原因,使用不同的ORM代替原始SQL来获取和处理数据库对象非常方便:

It's very convenient to use different ORM for fetching and processing database objects instead of raw SQL for the following reasons:

  1. 在简单的情况下,ORM可能是更方便的解决方案,请参见 ORM定义
  2. 假定已经建立了诸如Django之类的系统,这些系统具有定义的模型及其方法.这些模型的模式每次更改时,都需要重写气流原始SQL查询. ORM提供了用于处理此类模型的统一界面.

由于某些原因,在挂钩任务和运算符方面,没有在气流任务中使用ORM的示例.根据在Django之外使用Django数据库层?问题,是否需要设置到数据库的连接配置,然后直接在ORM中执行查询,但是在适当的钩子/运算符外部执行此操作会中断Airflow 原理.就像使用"python work_with_django_models.py" 命令调用BashOperator.

For some reason, there are no examples of working with ORM in Airflow tasks in terms of hooks and operators. According to Using Django database layer outside of Django? question, it's needed to set up a connection configuration to the database, and then straight-forwardly execute queires in ORM, but doing that outside appropriate hooks / operators breaks Airflow principles. It's like calling BashOperator with "python work_with_django_models.py" command.

那么在这种情况下最好的做法是什么?我们是否为Django ORM/其他ORM共享任何钩子/运算符?为了使以下代码真实(将其视为伪代码!):

So what are the best practisies in this case? Do we share any hooks / operators for Django ORM / other ORMs? In order to have the following code real (treat as pseudo-code!):

import os
import django
os.environ.setdefault(
    "DJANGO_SETTINGS_MODULE",
    "myapp.settings"
)
django.setup()
from your_app import models

def get_and_modify_models(ds, **kwargs):
    all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
    all_objects[15].my_int_field = 25
    all_objects[15].save()
    return list(all_objects)

django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')

而不是在原始SQL中实现此功能.

instead of implementing this functionality in raw SQL.

我认为这是一个非常重要的话题,因为在这种情况下,整个基于ORM的框架和流程都无法深入到Airflow中.

I think it's pretty important topic, as the whole banch of ORM-based frameworks and processes are not able to dive into Airflow in this case.

提前谢谢!

推荐答案

我同意我们应该继续进行此讨论,因为访问Django ORM可以大大降低解决方案的复杂性.

I agree we should continue to have this discussion as having access Django ORM can significantly reduce complexity of solutions.

我的方法是1)创建一个DjangoOperator

My approach has been to 1) create a DjangoOperator

import os, sys

from airflow.models import BaseOperator


def setup_django_for_airflow():
    # Add Django project root to path
    sys.path.append('./project_root/')

    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")

    import django
    django.setup()


class DjangoOperator(BaseOperator):

    def pre_execute(self, *args, **kwargs):
        setup_django_for_airflow()

和2)将DjangoOperator扩展为逻辑/运算符,这将从访问ORM中受益匪浅

and 2) Extend that DjangoOperator for logic / operators what would benefit from having access to ORM

from .base import DjangoOperator


class DjangoExampleOperator(DjangoOperator):

    def execute(self, context):
        from myApp.models import model
        model.objects.get_or_create()

使用此策略,您可以然后区分使用Raw SQL/ORM的运算符.还要注意,对于Django运算符,所有django模型导入都必须在执行上下文中,如上所示.

With this strategy, you can then distinguish between operators that use Raw SQL / ORM. Also note, that for the Django operator, all django model imports need to be within the execution context, demonstrated above.

这篇关于(Django)气流中的ORM-有可能吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-03 02:56