前言

  • 试验开始使用的 MySQL 8 作为数据库,截至 2021.5.13,airflow 2.0.2 的这个问题未解决,所以转为使用 PostgreSQL 12
  • airflow 是 DAG(有向无环图)的任务管理系统,简单的理解就是一个高级版的 crontab。
  • airflow 解决了 crontab 无法解决的任务依赖问题。
  • airflow 基本架构
  • airflow + celery 架构

环境与组件

  • Ubuntu 20.04
  • Python-3.8(Anaconda3-2020.11-Linux-x86_64)
  • PostgreSQL 12.6
  • apache-airflow 2.0.2
  • celery 4.4.7

集群规划

安装步骤

  • 创建账号(node0/node1/node2)

    sudo useradd airflow -m -s /bin/bash
    sudo passwd airflow
  • 切换账号(node0/node1/node2)

    su airflow
  • 配置 Anaconda 环境变量(node0/node1/node2)

    # /home/airflow/.bashrc
    export PATH=/home/airflow/anaconda3/bin:$PATH
  • 升级 pip(node0/node1/node2)

    pip install pip --upgrade  -i https://mirrors.aliyun.com/pypi/simple/
  • 配置 pip 国内镜像(node0/node1/node2)

    pip3 config set global.index-url https://mirrors.aliyun.com/pypi/simple/
  • 安装 airflow(node0/node1/node2)依赖项:https://airflow.apache.org/do...

    # 全家桶(master)
    pip3 install "apache-airflow[all]~=2.0.2"
    # OR 选择性安装
    pip3 install "apache-airflow[async,postgres,mongo,redis,rabbitmq,celery,dask]~=2.0.2"
  • 为 airflow 添加 PATH 环境变量(node0/node1/node2)

    # 在 /home/airflow/.bashrc 文件尾追加以下内容:
    export PATH=/home/airflow/.local/bin:$PATH
  • 查看 airflow 版本并创建 airflow 的 HOME 目录(node0/node1/node2)

    # 默认 ~/airflow 目录
    airflow version
  • 设置 Ubuntu 系统时区(node0/node1/node2)

    timedatectl set-timezone Asia/Shanghai
  • 修改 airflow 中的时区(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)

    [core]
    # 改为 system 或 Asia/Shanghai
    default_timezone = system
  • 至此,安装完毕

PostgreSQL 配置

  • 创建数据库

    CREATE DATABASE airflow_db;
  • 创建用户

    CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
    GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
  • 修改 PostgreSQL 连接(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)

    [core]
    sql_alchemy_conn = postgresql+psycopg2://airflow:[email protected]/airflow
  • 初始化数据库表(node0)

    airflow db init
  • 查看数据库是否初始化成功

WEB UI 登录

  • 创建管理员用户(node0)

    # 角色表: ab_role
    # 用户表: ab_user
    # 创建 Admin 角色用户
    airflow users create \
     --lastname user \
     --firstname admin \
     --username admin \
     --email [email protected] \
     --role Admin \
     --password admin123
    # 创建 Viewer 角色用户
    airflow users create \
     --lastname user \
     --firstname view \
     --username view \
     --email [email protected] \
     --role Viewer \
     --password view123
  • 启动 webserver(node0)

    airflow webserver -p 8080
  • 在浏览器使用创建的账号登录

配置 CeleryExecutor

  • celery 官方文档:https://docs.celeryproject.or...
  • 在 RabbitMQ 上创建 airflow 账号,并分配 virtual host
  • 修改配置文件(/home/airflow/airflow/airflow.cfg)(node0/node1/node2)

    [core]
    executor = CeleryExecutor
    [celery]
    broker_url = amqp://airflow:[email protected]:5672/vhost_airflow
    result_backend = db+postgresql://airflow:[email protected]/airflow

测试案例

  • 创建测试脚本(/home/airflow/airflow/dags/send_msg.py)(node0/node1/node2) ,发送本机 IP 到企业微信。

    # encoding: utf-8
    # author: qbit
    # date: 2021-05-13
    # summary: 发送/分配任务到任务结点
    
    import os
    import time
    import json
    import psutil
    import requests
    from datetime import timedelta
    from airflow.utils.dates import days_ago
    from airflow.models import DAG
    from airflow.operators.python_operator import PythonOperator
    
    
    def GetLocalIPByPrefix(prefix):
      r"""
    多网卡情况下,根据前缀获取IP
    测试可用:Windows、Linux,Python 3.6.x,psutil 5.4.x
    ipv4/ipv6 地址均适用
    注意如果有多个相同前缀的 ip,只随机返回一个
    """
      localIP = ''
      dic = psutil.net_if_addrs()
      for adapter in dic:
          snicList = dic[adapter]
          for snic in snicList:
              if not snic.family.name.startswith('AF_INET'):
                  continue
              ip = snic.address
              if ip.startswith(prefix):
                  localIP = ip
    
      return localIP
    
    
    def send_msg(msg='default msg', **context):
      r""" 发送 message 到企业微信 """
      print(context)
      run_id = context['run_id']
      nowTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
      message = '%s\n%s\n%s_%d\n%s' % (
          run_id, nowTime, GetLocalIPByPrefix('192.168.'), os.getpid(), msg)
      print(message)
    
      '''
      发送代码(涉及账号,本段代码隐藏)
      '''
    
    default_args = {
      'owner': 'qbit',
      # depends_on_past 是否依赖于过去。
      # 如果为True,那么必须要上次的 DAG 执行成功了,这次的 DAG 才能执行。
      'depends_on_past': False
    }
    
    with DAG(dag_id='send_msg',
           default_args=default_args,
           start_date=days_ago(1),
           schedule_interval=timedelta(seconds=60),
           # catchup 是否回补(backfill)开始时间到现在的任务
           catchup=False,
           tags=['qbit']
    ) as dag:
      first = PythonOperator(
          task_id='send_msg_1',
          python_callable=send_msg,
          op_kwargs={'msg': '111'},
          provide_context=True,
          dag=dag,
      )
    
      second = PythonOperator(
          task_id='send_msg_2',
          python_callable=send_msg,
          op_kwargs={'msg': '222'},
          provide_context=True,
          dag=dag,
      )
    
      third = PythonOperator(
          task_id='send_msg_3',
          python_callable=send_msg,
          op_kwargs={'msg': '333'},
          provide_context=True,
          dag=dag,
      )
    
      [third, first] >> second
  • 查看 dag 信息(node0)

    # 打印出所有正在活跃状态的 DAGs
    $ airflow dags list
    
    # 打印出 'send_msg' DAG 中所有的任务
    $ airflow tasks list send_msg
    [2021-05-13 16:00:47,123] {dagbag.py:451} INFO - Filling up the DagBag from /home/airflow/airflow/dags
    send_msg_1
    send_msg_2
    send_msg_3
    
    # 打印出 'send_msg' DAG 的任务层次结构
    $ airflow tasks list send_msg --tree
  • 测试单个 task(node0)

    airflow tasks test send_msg send_msg_1 20210513
  • 测试单个 dag(node0)

    airflow dags test send_msg 20210513
  • 集群测试

    # node0
    airflow webserver -p 8080
    airflow scheduler
    airflow celery flower  # 默认端口 5555
    # node1/node2
    airflow celery worker
    # 指定 hostname 启动
    airflow celery worker --celery-hostname node1

参考文献

03-05 15:55