地球人你好!
我正在使用Airflow计划和运行Spark任务。
我这次发现的所有内容都是Airflow可以管理的python DAG。
DAG示例:
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
问题是我的Python代码不好,并且有一些用Java编写的任务。我的问题是如何在python DAG中运行Spark Java jar?或者,也许还有其他方法吗?我发现了 Spark 提交:http://spark.apache.org/docs/latest/submitting-applications.html
但是我不知道如何将所有内容连接在一起。也许有人以前使用过它并有可行的例子。感谢您的时间!
最佳答案
您应该可以使用BashOperator
。保持其余代码不变,导入所需的类和系统软件包:
from airflow.operators.bash_operator import BashOperator
import os
import sys
设置所需的路径:
os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
并添加运算符:
spark_task = BashOperator(
task_id='spark_java',
bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
dag=dag
)
您可以使用Jinja模板轻松扩展它以提供其他参数。
您当然可以通过使用适合您的情况的模板替换
bash_command
来针对非 Spark 场景进行调整,例如:bash_command = 'java -jar {{ params.jar }}'
并调整
params
。