前言
- 试验开始使用的
MySQL 8
作为数据库,截至 2021.5.13,airflow 2.0.2 的这个问题未解决,所以转为使用PostgreSQL 12
- airflow 是 DAG(有向无环图)的任务管理系统,简单的理解就是一个高级版的 crontab。
- airflow 解决了 crontab 无法解决的任务依赖问题。
- airflow 基本架构
- airflow + celery 架构
环境与组件
- Ubuntu 20.04
- Python-3.8(Anaconda3-2020.11-Linux-x86_64)
- PostgreSQL 12.6
- apache-airflow 2.0.2
- celery 4.4.7
集群规划
安装步骤
创建账号(node0/node1/node2)
sudo useradd airflow -m -s /bin/bash sudo passwd airflow
切换账号(node0/node1/node2)
su airflow
配置 Anaconda 环境变量(node0/node1/node2)
# /home/airflow/.bashrc export PATH=/home/airflow/anaconda3/bin:$PATH
升级 pip(node0/node1/node2)
pip install pip --upgrade -i https://mirrors.aliyun.com/pypi/simple/
配置 pip 国内镜像(node0/node1/node2)
pip3 config set global.index-url https://mirrors.aliyun.com/pypi/simple/
安装 airflow(node0/node1/node2)依赖项:https://airflow.apache.org/do...
# 全家桶(master) pip3 install "apache-airflow[all]~=2.0.2" # OR 选择性安装 pip3 install "apache-airflow[async,postgres,mongo,redis,rabbitmq,celery,dask]~=2.0.2"
为 airflow 添加 PATH 环境变量(node0/node1/node2)
# 在 /home/airflow/.bashrc 文件尾追加以下内容: export PATH=/home/airflow/.local/bin:$PATH
查看 airflow 版本并创建 airflow 的 HOME 目录(node0/node1/node2)
# 默认 ~/airflow 目录 airflow version
设置 Ubuntu 系统时区(node0/node1/node2)
timedatectl set-timezone Asia/Shanghai
修改 airflow 中的时区(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)
[core] # 改为 system 或 Asia/Shanghai default_timezone = system
- 至此,安装完毕
PostgreSQL 配置
创建数据库
CREATE DATABASE airflow_db;
创建用户
CREATE USER airflow_user WITH PASSWORD 'airflow_pass'; GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
修改 PostgreSQL 连接(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)
[core] sql_alchemy_conn = postgresql+psycopg2://airflow:[email protected]/airflow
初始化数据库表(node0)
airflow db init
- 查看数据库是否初始化成功
WEB UI 登录
创建管理员用户(node0)
# 角色表: ab_role # 用户表: ab_user # 创建 Admin 角色用户 airflow users create \ --lastname user \ --firstname admin \ --username admin \ --email [email protected] \ --role Admin \ --password admin123 # 创建 Viewer 角色用户 airflow users create \ --lastname user \ --firstname view \ --username view \ --email [email protected] \ --role Viewer \ --password view123
启动
webserver
(node0)airflow webserver -p 8080
- 在浏览器使用创建的账号登录
配置 CeleryExecutor
- celery 官方文档:https://docs.celeryproject.or...
- 在 RabbitMQ 上创建 airflow 账号,并分配 virtual host
修改配置文件(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)
[core] executor = CeleryExecutor [celery] broker_url = amqp://airflow:[email protected]:5672/vhost_airflow result_backend = db+postgresql://airflow:[email protected]/airflow
测试案例
创建测试脚本(/home/airflow/airflow/dags/send_msg.py)(node0/node1/node2) ,发送本机 IP 到企业微信。
# encoding: utf-8 # author: qbit # date: 2021-05-13 # summary: 发送/分配任务到任务结点 import os import time import json import psutil import requests from datetime import timedelta from airflow.utils.dates import days_ago from airflow.models import DAG from airflow.operators.python_operator import PythonOperator def GetLocalIPByPrefix(prefix): r""" 多网卡情况下,根据前缀获取IP 测试可用:Windows、Linux,Python 3.6.x,psutil 5.4.x ipv4/ipv6 地址均适用 注意如果有多个相同前缀的 ip,只随机返回一个 """ localIP = '' dic = psutil.net_if_addrs() for adapter in dic: snicList = dic[adapter] for snic in snicList: if not snic.family.name.startswith('AF_INET'): continue ip = snic.address if ip.startswith(prefix): localIP = ip return localIP def send_msg(msg='default msg', **context): r""" 发送 message 到企业微信 """ print(context) run_id = context['run_id'] nowTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) message = '%s\n%s\n%s_%d\n%s' % ( run_id, nowTime, GetLocalIPByPrefix('192.168.'), os.getpid(), msg) print(message) ''' 发送代码(涉及账号,本段代码隐藏) ''' default_args = { 'owner': 'qbit', # depends_on_past 是否依赖于过去。 # 如果为True,那么必须要上次的 DAG 执行成功了,这次的 DAG 才能执行。 'depends_on_past': False } with DAG(dag_id='send_msg', default_args=default_args, start_date=days_ago(1), schedule_interval=timedelta(seconds=60), # catchup 是否回补(backfill)开始时间到现在的任务 catchup=False, tags=['qbit'] ) as dag: first = PythonOperator( task_id='send_msg_1', python_callable=send_msg, op_kwargs={'msg': '111'}, provide_context=True, dag=dag, ) second = PythonOperator( task_id='send_msg_2', python_callable=send_msg, op_kwargs={'msg': '222'}, provide_context=True, dag=dag, ) third = PythonOperator( task_id='send_msg_3', python_callable=send_msg, op_kwargs={'msg': '333'}, provide_context=True, dag=dag, ) [third, first] >> second
查看 dag 信息(node0)
# 打印出所有正在活跃状态的 DAGs $ airflow dags list # 打印出 'send_msg' DAG 中所有的任务 $ airflow tasks list send_msg [2021-05-13 16:00:47,123] {dagbag.py:451} INFO - Filling up the DagBag from /home/airflow/airflow/dags send_msg_1 send_msg_2 send_msg_3 # 打印出 'send_msg' DAG 的任务层次结构 $ airflow tasks list send_msg --tree
测试单个 task(node0)
airflow tasks test send_msg send_msg_1 20210513
测试单个 dag(node0)
airflow dags test send_msg 20210513
集群测试
# node0 airflow webserver -p 8080 airflow scheduler airflow celery flower # 默认端口 5555 # node1/node2 airflow celery worker # 指定 hostname 启动 airflow celery worker --celery-hostname node1
参考文献
- 相关阅读:Airflow2.0.0 + Celery 集群搭建
- 对比 airflow 中的 CeleryExecutor 和 DaskExecutor:A Gentle Introduction To Understand Airflow Executor