问题描述
这实际上是两个问题合而为一。
This is actually two questions combined into one.
我的 AIRFLOW_HOME
的结构类似于
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
我一直在这里关注astronomer.io的示例。我的自定义操作员
使用我的自定义钩子
,并且所有导入都相对于顶级文件夹插件
。
I've been following astronomer.io's examples here https://github.com/airflow-plugins. My custom operators
use my custom hooks
, and all the imports are relative to the top level folder plugins
.
# my_operator.py
from plugins.hooks.my_hook import MyHook
但是,当我尝试将整个存储库移入plugins文件夹时,运行 airflow list_dags
后出现导入错误,说找不到 plugins
。
However, when I tried moving my entire repository into the plugins folder, I get an import error after running airflow list_dags
saying that plugins
cannot be found.
我阅读了一下,显然Airflow将插件加载到其核心模块中,因此可以像
I read up a little about it and apparently Airflow loads the plugins into its core module so they can be imported like
# my_operator.py
from airflow.hooks.my_hook import MyHook
因此,我将所有导入更改为直接从 airflow.plugin_type
读取。但是,我收到另一个导入错误,这次是说找不到 my_hook
。我每次都重新启动工作程序,调度程序和网络服务器,但这似乎不是问题。我已经看过在类似问题中提出的解决方案,它们也不起作用。
So I changed all the imports to read directly from airflow.plugin_type
instead. I get another import error though, this time saying that my_hook
cannot be found. I restart my workers, scheduler and webserver every time but that doesn't seem to be the issue. I've looked at solutions proposed in similar questions and they don't work either.
官方文档也以这种方式显示扩展了 AirflowPlugin
类,但我不确定该接口应位于何处。我还希望使用拖放选项。
The official documentation also shows this way https://airflow.apache.org/plugins.html of extending the AirflowPlugin
class, but I'm not sure where this "interface" should reside. I would also prefer a drag and drop option.
最后,对于我的代码存储库来说,插件显然没有意义
Finally, it clearly doesn't make sense for my code repo to be the plugins
folder itself, but if I separate them testing becomes inconvenient. Do I have to modify my Airflow configurations to point to my repo every time I run unit tests on my hooks/ops? What are the best practices for testing custom plugins?
推荐答案
我通过反复试验弄清楚了这一点。这是我的 AIRFLOW_HOME
文件夹的最终结构
I figured this out by doing some trial and error. This is the final structure of my AIRFLOW_HOME
folder
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- plugin_name.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
在 plugin_name.py
中,我扩展了 AirflowPlugin
类
# plugin_name.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_hook import *
from operators.my_operator import *
from utils.my_utils import *
# etc
class PluginName(AirflowPlugin):
name = 'plugin_name'
hooks = [MyHook]
operators = [MyOperator]
macros = [my_util_func]
在使用自定义钩子的自定义运算符中,我将其导入
In my custom operators which use my custom hooks, I import them like
# my_operator.py
from hooks.my_hook import MyHook
然后可以在我的DAG文件中
Then in my DAG files, I can do
# sample_dag.py
from airflow.operators.plugin_name import MyOperator
由于自定义类中的导入相对于文件夹 plugins
中的子模块,因此这也便于测试。我想知道是否可以省略 plugins
内的 __ init __。py
文件,但是由于一切正常,所以我没有尝试这样做。
This also facilitates testing since the imports within the custom classes are relative to the sub modules within the folder plugins
. I wonder if I can omit the __init__.py
file inside plugins
, but since everything is working I didn't try doing that.
这篇关于在Airflow中编写和导入自定义插件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!