本文介绍了在Airflow中编写和导入自定义插件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这实际上是两个问题合而为一。

This is actually two questions combined into one.

我的 AIRFLOW_HOME 的结构类似于

airflow
+-- dags
+-- plugins
    +-- __init__.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py
        +-- another_hook.py
    +-- operators
        +-- __init__.py
        +-- my_operator.py
        +-- another_operator.py
    +-- sensors
    +-- utils

我一直在这里关注astronomer.io的示例。我的自定义操作员使用我的自定义钩子,并且所有导入都相对于顶级文件夹插件

I've been following astronomer.io's examples here https://github.com/airflow-plugins. My custom operators use my custom hooks, and all the imports are relative to the top level folder plugins.

# my_operator.py
from plugins.hooks.my_hook import MyHook

但是,当我尝试将整个存储库移入plugins文件夹时,运行 airflow list_dags 后出现导入错误,说找不到 plugins

However, when I tried moving my entire repository into the plugins folder, I get an import error after running airflow list_dags saying that plugins cannot be found.

我阅读了一下,显然Airflow将插件加载到其核心模块中,因此可以像

I read up a little about it and apparently Airflow loads the plugins into its core module so they can be imported like

# my_operator.py
from airflow.hooks.my_hook import MyHook

因此,我将所有导入更改为直接从 airflow.plugin_type 读取。但是,我收到另一个导入错误,这次是说找不到 my_hook 。我每次都重新启动工作程序,调度程序和网络服务器,但这似乎不是问题。我已经看过在类似问题中提出的解决方案,它们也不起作用。

So I changed all the imports to read directly from airflow.plugin_type instead. I get another import error though, this time saying that my_hook cannot be found. I restart my workers, scheduler and webserver every time but that doesn't seem to be the issue. I've looked at solutions proposed in similar questions and they don't work either.

官方文档也以这种方式显示扩展了 AirflowPlugin 类,但我不确定该接口应位于何处。我还希望使用拖放选项。

The official documentation also shows this way https://airflow.apache.org/plugins.html of extending the AirflowPlugin class, but I'm not sure where this "interface" should reside. I would also prefer a drag and drop option.

最后,对于我的代码存储库来说,插件显然没有意义

Finally, it clearly doesn't make sense for my code repo to be the plugins folder itself, but if I separate them testing becomes inconvenient. Do I have to modify my Airflow configurations to point to my repo every time I run unit tests on my hooks/ops? What are the best practices for testing custom plugins?

推荐答案

我通过反复试验弄清楚了这一点。这是我的 AIRFLOW_HOME 文件夹的最终结构

I figured this out by doing some trial and error. This is the final structure of my AIRFLOW_HOME folder

airflow 
+-- dags 
+-- plugins
    +-- __init__.py
    +-- plugin_name.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py 
        +-- another_hook.py 
    +-- operators
        +-- __init__.py
        +-- my_operator.py 
        +-- another_operator.py 
    +-- sensors 
    +-- utils

plugin_name.py 中,我扩展了 AirflowPlugin

# plugin_name.py

from airflow.plugins_manager import AirflowPlugin
from hooks.my_hook import *
from operators.my_operator import *
from utils.my_utils import *
# etc

class PluginName(AirflowPlugin):

    name = 'plugin_name'

    hooks = [MyHook]
    operators = [MyOperator]
    macros = [my_util_func]

在使用自定义钩子的自定义运算符中,我将其导入

In my custom operators which use my custom hooks, I import them like

# my_operator.py

from hooks.my_hook import MyHook

然后可以在我的DAG文件中

Then in my DAG files, I can do

# sample_dag.py

from airflow.operators.plugin_name import MyOperator

由于自定义类中的导入相对于文件夹 plugins 中的子模块,因此这也便于测试。我想知道是否可以省略 plugins 内的 __ init __。py 文件,但是由于一切正常,所以我没有尝试这样做。

This also facilitates testing since the imports within the custom classes are relative to the sub modules within the folder plugins. I wonder if I can omit the __init__.py file inside plugins, but since everything is working I didn't try doing that.

这篇关于在Airflow中编写和导入自定义插件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-10 05:16