我正在尝试在Airflow上运行测试任务,但始终收到以下错误:



这是我的Airflow Dag文件:

import airflow
from datetime import datetime, timedelta
from airflow.operators.hive_operator import HiveOperator
from airflow.models import DAG

args = {
    'owner': 'raul',
    'start_date': datetime(2018, 11, 12),
    'provide_context': True,
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email': ['raul.gregglino@leroymerlin.ru'],
    'email_on_failure': True,
    'email_on_retry': False
}

dag = DAG('opus_data',
    default_args=args,
    max_active_runs=6,
    schedule_interval="@daily"
)

import_lv_data = HiveOperator(
    task_id='fct_latest_values',
    hive_cli_conn_id='metastore_default',
    hql='create_import_table_fct_latest_values.hql ',
    hiveconf_jinja_translate=True,
    dag=dag
    )

deps = {}

# Explicity define the dependencies in the DAG
for downstream, upstream_list in deps.iteritems():
    for upstream in upstream_list:
        dag.set_dependency(upstream, downstream)

这是我的HQL文件的内容,以防万一这可能是问题所在,但我无法确定:
*I'm testing the connection to understand if the table is created or not, then I'll try to LOAD DATA, hence the LOAD DATA is commented out.
CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data (
    id_product          STRING,
    id_model            STRING,
    id_attribute        STRING,
    attribute_value     STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED ',';

#LOAD DATA LOCAL INPATH
#'/media/windows_share/schemas/opus/fct_latest_values_20181106.csv'
#OVERWRITE INTO TABLE opus_data.fct_latest_values_new_data;

最佳答案

在HQL文件中,应为FIELDS TERMINATED BY ',':

CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data (
    id_product          STRING,
    id_model            STRING,
    id_attribute        STRING,
    attribute_value     STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

并且注释应该以HQL文件中的--开头,而不是#
另外这似乎不正确,并导致异常hql='create_import_table_fct_latest_values.hql '
看一下这个例子:
 #Create full path for the file
    hql_file_path = os.path.join(os.path.dirname(__file__), source['hql'])
    print hql_file_path
    run_hive_query = HiveOperator(
        task_id='run_hive_query',
        dag = dag,
        hql = """
        {{ local_hive_settings }}
        """ + "\n " + open(hql_file_path, 'r').read()
)

有关更多详细信息,请参见here

或将所有HQL放入hql参数中:
hql='CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data ...'

09-30 13:50
查看更多