前言

  • airflow 是 DAG(有向无环图)的任务管理系统,简单的理解就是一个高级版的 crontab。
  • airflow 解决了 crontab 无法解决的任务依赖问题。

环境与组件

  • Ubuntu 18.04
  • MySQL 5.7
  • Python 3.6.9
  • airflow 1.10.10
  • celery 4.4.2
  • RabbitMQ 3.6.10

实战步骤

基本步骤

  • 安装 Python3 的 pip

    sudo apt install python3-pip
  • 安装 MySQL 开发包

    # 避免报错: OSError: mysql_config not found
    sudo apt install libmysqlclient-dev python3-dev
  • 在 3 台 airflow 服务器服务器上创建账号

    sudo useradd airflow -m -s /bin/bash
    sudo passwd airflow
  • 以下步骤在 airflow 账号下进行
  • 修改 PATH 环境变量

    在 /home/airflow/.bashrc 文件尾追加以下内容:
    export PATH=/home/airflow/.local/bin:$PATH
  • 升级 pip

    pip3 install pip --upgrade
  • 设置豆瓣镜像

    pip3 config set global.index-url https://pypi.doubanio.com/simple/
  • 在 3 台机器上安装 airflow

    # 全家桶(master)
    pip3 install "apache-airflow[all]==1.10.*"
    # OR 选择性安装
    pip3 install "apache-airflow[mysql,celery,rabbitmq]==1.10.*"
  • 查看 airflow 版本并创建 airflow 的 HOME 目录

    # 默认 ~/airflow 目录
    airflow version
  • 设置 Ubuntu 18.04 系统时区

    timedatectl set-timezone Asia/Shanghai
  • 修改后台时区(/home/airflow/airflow/airflow.cfg)

    [core]
    # Default timezone in case supplied date times are naive
    # can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
    # default_timezone = utc
    # 改为 system 或 Asia/Shanghai
    default_timezone = system
  • 启用 RBAC UI 并修改 UI 时区

    [webserver]
    rbac=True
    default_ui_timezone = system 
  • 修改 MySQL 连接(/home/airflow/airflow/airflow.cfg)

    [core]
    # MySQL 连接字符串
    sql_alchemy_conn = mysql+pymysql://youruser:[email protected]/airflow
  • 安装 pymysql

    pip3 install pymysql
  • 手动创建 airflow 数据库
  • 初始化数据库表

    airflow initdb
  • 查看数据库是否初始化成功
  • 创建用户

    # 角色表: ab_role
    # 用户表: ab_user
    # 创建 Admin 角色用户
    airflow create_user --lastname user \
     --firstname admin \
     --username admin \
     --email [email protected] \
     --role Admin \
     --password admin123
    # 创建 Viewer 角色用户
    airflow create_user --lastname user \
     --firstname view \
     --username view \
     --email [email protected] \
     --role Viewer \
     --password view123
  • 启动 web 服务器,默认端口是 8080

    airflow webserver -p 8080
  • 启动定时器

    airflow scheduler
  • 用浏览器打开 192.168.y.z:8080 查看 WEB UI

    登陆后
  • 在 RabbitMQ 上创建 airflow 账号,并分配 virtual host
  • 修改 master 配置文件(/home/airflow/airflow/airflow.cfg)

    [core]
    executor = CeleryExecutor
    [celery]
    broker_url = amqp://mq_user:[email protected]:5672/vhost_airflow
    result_backend = db+mysql://youruser:[email protected]/airflow
  • 从 master 同步配置文件到 node

    # 测试
    rsync -v /home/airflow/airflow/airflow.cfg airflow@node1_ip:/home/airflow/airflow/
    rsync -v /home/airflow/airflow/airflow.cfg airflow@node2_ip:/home/airflow/airflow/
  • 免密码提示脚本

    # 明文暴露了密码,不建议生产环境使用
    sshpass -p airflow rsync /home/airflow/airflow/airflow.cfg airflow@node1_ip:/home/airflow/airflow/
    sshpass -p airflow rsync /home/airflow/airflow/airflow.cfg airflow@node2_ip:/home/airflow/airflow/
  • 创建测试脚本(/home/airflow/airflow/dags/send_msg.py),发送本机 IP 到企业微信。

    # encoding: utf-8
    # author: qbit
    # date: 2020-04-02
    # summary: 发送/分配任务到任务结点
    
    import os
    import time
    import json
    import psutil
    import requests
    from datetime import timedelta
    from airflow.utils.dates import days_ago
    from airflow.models import DAG
    from airflow.operators.python_operator import PythonOperator
    
    default_args = {
      'owner': 'Airflow',
      # depends_on_past 是否依赖于过去。
      # 如果为True,那么必须要上次的DAG执行成功了,这次的DAG才能执行。
      'depends_on_past': False,
      'start_date': days_ago(1),
    }
    
    dag = DAG(
      dag_id='send_msg',
      default_args=default_args,
      # catchup 是否回补(backfill)开始时间到现在的任务
      catchup=False,
      start_date=days_ago(1),
      schedule_interval=timedelta(seconds=60),
      tags=['example']
    )
    def GetLocalIPByPrefix(prefix):
      r"""
      多网卡情况下,根据前缀获取IP
      测试可用:Windows、Linux,Python 3.6.x,psutil 5.4.x
      ipv4/ipv6 地址均适用
      注意如果有多个相同前缀的 ip,只随机返回一个
      """
      localIP = ''
      dic = psutil.net_if_addrs()
      for adapter in dic:
          snicList = dic[adapter]
          for snic in snicList:
              if not snic.family.name.startswith('AF_INET'):
                  continue
              ip = snic.address
              if ip.startswith(prefix):
                  localIP = ip
    
      return localIP
    
    def send_msg(msg='default msg', **context):
      r""" 发送 message 到企业微信 """
      print(context)
      run_id = context['run_id']
      nowTime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
      message =  '%s\n%s\n%s_%d\n%s' % (run_id, nowTime,
                          GetLocalIPByPrefix('192.168.'), os.getpid(), msg)
      print(message)
    
      '''
      发送代码
      '''
    first = PythonOperator(
      task_id='send_msg_1',
      python_callable=send_msg,
      op_kwargs={'msg':'111'},
      provide_context=True,
      dag=dag,
    )
    
    second = PythonOperator(
      task_id='send_msg_2',
      python_callable=send_msg,
      op_kwargs={'msg':'222'},
      provide_context=True,
      dag=dag,
    )
    
    third = PythonOperator(
      task_id='send_msg_3',
      python_callable=send_msg,
      op_kwargs={'msg':'333'},
      provide_context=True,
      dag=dag,
    )
    
    [third, first] >> second

  • 验证脚本

    # 打印出所有正在活跃状态的 DAGs
    airflow list_dags
    
    # 打印出 'send_msg' DAG 中所有的任务
    airflow list_tasks send_msg
    
    # 打印出 'send_msg' DAG 的任务层次结构
    airflow list_tasks send_msg --tree
  • 从 master 同步 dags 目录到 node

    sshpass -p airflow rsync -a /home/airflow/airflow/dags/ airflow@node1_ip:/home/airflow/airflow/dags/
    sshpass -p airflow rsync -a /home/airflow/airflow/dags/ airflow@node2_ip:/home/airflow/airflow/dags/
  • 启动步骤

    # master
    airflow webserver -p 8080
    airflow scheduler
    airflow flower  # 默认端口 5555
    # node1/node2
    airflow worker

错误排查

  • 启动 worker 如果报类似下面的错误,是 celery 连 RabbitMQ的问题,卸载 librabbitmq 即可。
    卸载命令:

    pip3 uninstall librabbitmq

    错误:

    [2020-04-02 14:54:42,279: CRITICAL/MainProcess] Unrecoverable error: SystemError('<built-in method _basic_recv of Connection object at 0x7fc1da870a68> returned a result with an error set',)
    Traceback (most recent call last):
    File "/home/airflow/.local/lib/python3.6/site-packages/kombu/messaging.py", line 624, in _receive_callback
      return on_m(message) if on_m else self.receive(decoded, message)
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 571, in on_task_received
      callbacks,
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/worker/strategy.py", line 203, in task_message_handler
      handle(req)
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/worker/worker.py", line 223, in _process_task_sem
      return self._quick_acquire(self._process_task, req)
    File "/home/airflow/.local/lib/python3.6/site-packages/kombu/asynchronous/semaphore.py", line 62, in acquire
      callback(*partial_args, **partial_kwargs)
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/worker/worker.py", line 228, in _process_task
      req.execute_using_pool(self.pool)
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/worker/request.py", line 652, in execute_using_pool
      correlation_id=task_id,
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/concurrency/base.py", line 158, in apply_async
      **options)
    File "/home/airflow/.local/lib/python3.6/site-packages/billiard/pool.py", line 1530, in apply_async
      self._quick_put((TASK, (result._job, None, func, args, kwds)))
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/concurrency/asynpool.py", line 885, in send_job
      body = dumps(tup, protocol=protocol)
    TypeError: can't pickle memoryview objects
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
      self.blueprint.start(self)
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
      step.start(parent)
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
      return self.obj.start()
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 318, in start
      blueprint.start(self)
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
      step.start(parent)
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 599, in start
      c.loop(*c.loop_args())
    File "/home/airflow/.local/lib/python3.6/site-packages/celery/worker/loops.py", line 83, in asynloop
      next(loop)
    File "/home/airflow/.local/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 364, in create_loop
      cb(*cbargs)
    File "/home/airflow/.local/lib/python3.6/site-packages/kombu/transport/base.py", line 238, in on_readable
      reader(loop)
    File "/home/airflow/.local/lib/python3.6/site-packages/kombu/transport/base.py", line 220, in _read
      drain_events(timeout=0)
    File "/home/airflow/.local/lib/python3.6/site-packages/librabbitmq/__init__.py", line 227, in drain_events
      self._basic_recv(timeout)
    SystemError: <built-in method _basic_recv of Connection object at 0x7fc1da870a68> returned a result with an error set
    

相关链接

03-05 15:54