官网:
http://airflow.apache.org/installation.html
https://github.com/apache/airflow
原理:
https://www.cnblogs.com/cord/p/9450910.html
安装:
https://www.cnblogs.com/cord/p/9226608.html
高可用部署等:
https://www.jianshu.com/p/2ecef979c606
使用方法等:
https://www.jianshu.com/p/cbff05e3f125
日志在:
/tmp/scheduler.log
#用普通帐号部署:
su airflow
启动命令:
. /home/airflow/venv/bin/activate
#单机部署
https://www.jianshu.com/p/9bed1e3ab93b
1/ python3.6.5环境
https://www.cnblogs.com/hongfeng2019/p/11250989.html
依赖和superset也相同
yum install mysql-devel
pip3 install mysqlclient
2/ 安装mysql
https://www.cnblogs.com/hongfeng2019/p/11279488.html
create database airflowdb
use airflowdb;
GRANT all privileges ON airflowdb.* TO 'airflow'@'localhost' IDENTIFIED BY 'Fengfeng99&';
GRANT all privileges ON airflowdb.* TO 'airflow'@'10.52.%.%' IDENTIFIED BY 'Fengfeng&';
set @@global.explicit_defaults_for_timestamp=on;
防止后面报错:
"Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql"
3/ 安装airflow
mkdir -p /root/airflow
vim /etc/profile
export AIRFLOW_HOME=/root/airflow
export SLUGIFY_USES_TEXT_UNIDECODE=yes
source /etc/profile
pip install apache-airflow[all]
4/修改配置
vim /root/airflow/airflow.cfg
sql_alchemy_conn = mysql://airflow:Fengfeng&@10.52.56.119:3306/airflowdb
5/初始化数据库
airflow initdb
#集群模式部署:
https://www.jianshu.com/p/bddacfd66c1f
1/ 安装celery
pip install celery
2/ 安装redis (现redis和mysql都改为ucloud的)
普通帐号起参考: https://www.cnblogs.com/hongfeng2019/p/11524173.html
wget http://download.redis.io/releases/redis-4.0.11.tar.gz
$ tar xzf redis-4.0.11.tar.gz
mv redis-4.0.11 /opt/
cd /opt/redis-4.0.11 #公司的在/hongfeng/software
$ make
make执行完后会在src目录下生成redis-server,redis-cli等六个可执行文件.
redis-server: redis服务器的daemon启动程序
redis-cli: redis命令操作工具
#改redis-4.0.11目录下的redis.conf
send -i 's/daemonize no/daemonize yes/g' /opt/redis-4.0.11/redis.conf
sed -i 's/bind 127.0.0.1/#bind 127.0.0.1/g' /opt/redis-4.0.11/redis.conf #允许远程连接
sed -i 's/protected-mode yes/protected-mode no/g' /opt/redis-4.0.11/redis.conf #允许远程连接
启动redis服务:
cd src/
./redis-server ../redis.conf
3/ 配置airflow.cfg
#改LocalExecutor
executor = CeleryExecutor
broker_url = redis://10.52.56.119:6379/0
result_backend = db+mysql://airflow:Fengfeng&@10.52.56.119:3306/airflowdb
4/ 安装python的redis包
pip install redis
5/ 运行airflow
#启动airflow webserver测试
airflow webserver -p 8080
后台启动:
#启动webserver
#后台运行 airflow webserver -p 8080 -D
airflow webserver -p 8080
#启动scheduler
#后台运行 airflow scheduler -D
airflow scheduler
#启动worker
#后台运行 airflow worker -D
#如提示addres already use ,则查看 worker_log_server_port = 8793 是否被占用,如是则修改为 8974 等
#未被占用的端口
airflow worker
#启动flower -- 可以不启动
#后台运行 airflow flower -D
airflow flower
如果 airflow flower 运行报错,请执行 pip install flower 来安装 flower 。
三. 多点的高可用
我们可以借助第三方组件 airflow-scheduler-failover-controller 实现 scheduler 的高可用。
具体步骤如下所示:
下载 failover
1/ git clone https://github.com/teamclairvoyant/airflow-scheduler-failover-controller
使用 pip 进行安装
cd{AIRFLOW_FAILOVER_CONTROLLER_HOME}
pip install -e .
初始化 failover
scheduler_failover_controller init
注:初始化时,会向airflow.cfg中追加内容,因此需要先安装 airflow 并初始化。
2/ 更改 failover 配置
vim /root/airflow.cnf
scheduler_nodes_in_cluster= host1,host2 #可用ip
注:host name 可以通过scheduler_failover_controller get_current_host命令获得
# Number of times to retry starting up the scheduler before it sends an alert
retry_count_before_alerting = 5 #failover前尝试在那台起schudle几次,可改为0
测试:
可把airflow-1关掉,看scheudle是否切换,tailf /root/airflow/failover.log看切换情况.
3/ 配置安装 failover 的机器之间的免密登录,配置完成后,可以使用如下命令进行验证:
scheduler_failover_controller test_connection
启动 failover
scheduler_failover_controller start
/data/venv/bin/scheduler_failover_controller start
测试:
把主node 119停掉
. /data/venv/bin/activate
supervisorctl status
vim /data/venv/etc/supervisord.conf
发现failover起作用了,但schudle进程并没有拉起来.检查发现:
要实现真正的高可用还需要:
1/ mysql的MHA
2/ redies的集群
#集群要增加节点:
同样的配置克隆一台,但supervisor里只起failover和worker服务
119上没有woker schudle进程 ps -ef , supervisor看
重建服务器后配置:
改hostname
改/etc/hosts
cd /root/airflow
vim airflow.cnf
改下面的ip为新ip:
broker_url = redis://10.52.56.119:6379/0
result_backend = db+mysql://airflow:Airflow99&@10.52.56.119:3306/airflowdb
sql_alchemy_conn = mysql://airflow:Airflow99&@10.52.56.119:3306/airflowdb
scheduler_nodes_in_cluster = 10.52.96.246,10.52.143.191
cd /data/venv/etc
vim supervisord.conf
supervisord -c /data/venv/etc/supervisord.conf #启动supervisor
cd /root/airflow/dags
git pull
. /data/venv/bin/activate
服务命令:
supervisorctl start [service_name] #启动服务
supervisorctl status [service_name] #停止服务
停止supervisor:
venv) [root@airflow-1 ~]# ps -ef |grep supervisord
root 31684 1 0 Oct29 ? 00:07:07 /data/venv/bin/python3 /data/venv/bin/supervisord -c /data/venv/etc/supervisord.conf
pkill supervisord
or kill -9 31684
#supervisor自身的日志在/tmp/supervisord.log
#更新环境
scp -r /opt/cloudera 10.52.41.187:/opt
airflow的web改动:
1/ admin--connection; 改成node1.datalake.opay.com
2/ echo $JAVA_HOME问题:
echo $HADOOP_HOME
/opt/cloudera/parcels/CDH/lib/hadoop
cd /opt/cloudera/parcels/CDH/lib/hadoop/etc/hadoop
在hadoop-env.sh中,再显示地重新声明一遍JAVA_HOME
export HADOOP_MAPRED_HOME=$( ([[ ! '/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce' =~ CDH_MR2_HOME ]] && echo /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce ) || echo ${CDH_MR2_HOME:-/usr/lib/hadoop-mapreduce/} )
export YARN_OPTS="-Xmx838860800 -Djava.net.preferIPv4Stack=true $YARN_OPTS"
export HADOOP_CLIENT_OPTS="-Djava.net.preferIPv4Stack=true $HADOOP_CLIENT_OPTS"
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64
3/ 日志同步,从2复制到1 /root/airflow/logs,在master上配, 不然会报[ERROR[0m - Failed to open transport (tries_left=3)]找不到日志的错
vim /hongfeng/script/rsynce_airflow_log.sh
rsync -azuq -e ssh [email protected]:/root/airflow/logs/ /root/airflow/logs/ --exclude 'scheduler' --exclude 'scheduler_failover' --exclude 'dag_processor_manager'
*/2 * * * * /bin/sh /hongfeng/script/rsynce_airflow_log.sh >/dev/null 2>&1
4/ ERROR - cannot import name 'constants' 的问题
vi /data/venv/lib/python3.6/site-packages/airflow/hooks/hive_hooks.py
line:802
#from pyhive.hive import connect
from impala.dbapi import connect
# return connect(
# host=db.host,
# port=db.port,
# auth_mechanism='PLAIN',
#auth=auth_mechanism,
# kerberos_service_name=kerberos_service_name,
# username=db.login or username,
# database=schema or db.schema or 'default')
return connect(
host=db.host,
port=db.port,
auth_mechanism='PLAIN',
user=db.login,
password=db.password)
检查thrift-sasl版本
pip list | grep thrift-sasl
thrift-sasl 0.3.0
pip install thrift-sasl==0.2.1
5/ airflow-web服务起不来
看/root/airflow/webserver.log日志,是否有登陆滚动信息
schedule的假死问题可以用strace追踪
strace -p
nohup strace -o output.txt -T -tt -e trace=all -p 8761 &
nohup strace -o /hongfeng/output.txt -p 32536 &
6/ 当定义的dag文件过多的时候,airflow的scheduler节点运行效率缓慢
[scheduler]
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
#默认是2这里改为100
max_threads = 20
查看:
ps -ef |grep schedu
7/ 任务失败发邮件时同时发送微信报警
vi /data/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py
email_alert方法
# wx
import sys
sys.path.append("/root/airflow/dags")
from plugins.comwx import ComwxApi
comwx = ComwxApi('wwd26d45f97ea74ad2', 'BLE_v25zCmnZaFUgum93j3zVBDK-DjtRkLisI_Wns4g', '1000011')
comwx.postAppMessage(subject, '271')
8/ 随着任务增多,airflow调度hive的压力增大
解决:配置ulb转发到集群的多hive节点,分担压力。 ulb配置见:https://www.cnblogs.com/hongfeng2019/p/11934689.html
10.52.151.166 10000
airlow2上:
scp airflow.cfg 10.52.51.8:/root/airflow
cd /data/venv/etc
vim supervisord.conf
删除
[program:airflow_flower]
[program:airflow_web]
supervisord -c /data/venv/etc/supervisord.conf
systemctl service mysqld
airflow日常维护:
同事震乾赞助
大数据Airflow集群目前部署了两台server:
10.52.56.119
10.52.12.116
服务部署情况:
10.52.56.119
mysql
redis
airflow_failover
airflow_flower
airflow_scheduler
airflow_web
airflow_worker
10.52.12.116
airflow_failover
airflow_worker
访问服务:
ip:8080 airflow_web 服务
ip:5555 airflow_flower 服务
airflow相关服务由supvisor管理,日常维护命令如下
先进入python虚拟环境
. /data/venv/bin/activate
退出:deactivate
服务命令:
supervisorctl start [service_name] #启动服务
supervisorctl stop [service_name] #停止服务
supervisorctl stop all
supervisorctl restart [service_name] #重启服务
supervisor配置文件 /data/venv/etc/supervisord.conf
[program:airflow_web]
command=airflow webserver -p
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/webserver.log [program:airflow_scheduler]
command=airflow scheduler
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/scheduler.log [program:airflow_worker]
command=airflow worker
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
environment=C_FORCE_ROOT=true
stdout_logfile=/root/airflow/worker.log [program:airflow_flower]
command=airflow flower --basic_auth=admin:opay321
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/flower.log [program:airflow_failover]
command=scheduler_failover_controller start
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/failover.log
git同步脚本:
vim /root/deploy_git_airflow.sh
ansible airflow -m shell -a 'cd /root/airflow/dags && git pull'
airflow连接配置: