将Google云存储桶配置为Airflow

将Google云存储桶配置为Airflow

本文介绍了将Google云存储桶配置为Airflow Log文件夹的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们刚刚开始在项目中为数据管道使用Apache airflow。在探索这些功能的同时,我们知道将远程文件夹配置为airflow中的日志目标。为此,我们

We just started using Apache airflow in our project for our data pipelines .While exploring the features came to know about configuring remote folder as log destination in airflow .For that we

创建了一个谷歌云桶。
从Airflow UI创建了新的GS连接

Created a google cloud bucket.From Airflow UI created a new GS connection

我无法理解所有字段。我刚刚从Google控制台在我的项目下创建了一个示例GS Bucket,并将该项目ID赋予此Connection.Left密钥文件路径和范围为空白。
然后按如下所示编辑airflow.cfg文件

I am not able to understand all the fields .I just created a sample GS Bucket under my project from google console and gave that project ID to this Connection.Left key file path and scopes as blank.Then edited airflow.cfg file as follows

remote_base_log_folder = gs://my_test_bucket/
remote_log_conn_id = test_gs

此更改后,重新启动了Web服务器和调度程序。但是,我的Dags仍未将日志写入GS bucket。我能够在base_log_folder中看到正在创建日志的日志。但是我的bucket中什么也没有创建。
是否需要我进行任何其他配置才能使其正常工作

After this changes restarted the web server and scheduler .But still my Dags is not writing logs to the GS bucket .I am able to see the logs which is creating logs in base_log_folder .But nothing is created in my bucket .Is there any extra configuration needed from my side to get it working

注意:使用Airflow 1.8。(我也遇到了AmazonS3的同样问题。)

Note: Using Airflow 1.8 .(Same issue I faced with AmazonS3 also. )

于2017年9月20日更新

Updated on 20/09/2017

尝试使用GS方法附加屏幕截图

Tried the GS method attaching screenshot


仍然没有在存储桶中获取日志

Still I am not getting logs in the bucket

感谢
Anoop R

ThanksAnoop R

推荐答案

我建议您使用DAG将气流连接到GCP而不是UI。

I advise you to use a DAG to connect airflow to GCP instead of UI.

首先,在GCP上创建一个服务帐户并下载json密钥。

First, create a service account on GCP and download the json key.

然后执行此DAG(您可以修改自己的范围访问):

Then execute this DAG (you can modify the scope of your access):

from airflow import DAG
from datetime import datetime
from airflow.operators.python_operator import PythonOperator

def add_gcp_connection(ds, **kwargs):

      """Add a airflow connection for GCP"""

     new_conn = Connection(
           conn_id='gcp_connection_id',
           conn_type='google_cloud_platform',
     )
     scopes = [
          "https://www.googleapis.com/auth/pubsub",
          "https://www.googleapis.com/auth/datastore",
          "https://www.googleapis.com/auth/bigquery",
          "https://www.googleapis.com/auth/devstorage.read_write",
          "https://www.googleapis.com/auth/logging.write",
          "https://www.googleapis.com/auth/cloud-platform",
     ]
     conn_extra = {
          "extra__google_cloud_platform__scope": ",".join(scopes),
          "extra__google_cloud_platform__project": "<name_of_your_project>",
          "extra__google_cloud_platform__key_path": '<path_to_your_json_key>'
}
     conn_extra_json = json.dumps(conn_extra)
     new_conn.set_extra(conn_extra_json)
     session = settings.Session()
     if not (session.query(Connection).filter(Connection.conn_id ==
      new_conn.conn_id).first()):
         session.add(new_conn)
         session.commit()
    else:
         msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
         msg = msg.format(conn_id=new_conn.conn_id)
         print(msg)

 dag = DAG('add_gcp_connection', start_date=datetime(2016,1,1), schedule_interval='@once')

# Task to add a connection
AddGCPCreds = PythonOperator(
       dag=dag,
       task_id='add_gcp_connection_python',
       python_callable=add_gcp_connection,
       provide_context=True)

感谢Yu Ishikawa的。

Thanks to Yu Ishikawa for this code.

这篇关于将Google云存储桶配置为Airflow Log文件夹的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-28 08:43