气流不会将日志写入S3

气流不会将日志写入S3

本文介绍了气流不会将日志写入S3的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试了不同的方法来配置Airflow 1.9以将日志写入s3,但是它只是忽略了它。我发现很多人在这样做后都无法阅读日志,但是我的问题是日志仍然是本地的。我可以毫无问题地读取它们,但它们不在指定的s3存储桶中。

I tried different ways to configure Airflow 1.9 to write logs to s3 however it just ignores it. I found a lot of people having problems reading the Logs after doing so, however my problem is that the Logs remain local. I can read them without problem but they are not in the specified s3 bucket.

我尝试的是首先写入airflow.cfg文件

What I tried was first to write into the airflow.cfg file

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = s3://bucketname/logs
remote_log_conn_id = aws
encrypt_s3_logs = False

然后我尝试设置环境变量

Then I tried to set environment variables

AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucketname/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws
AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

但是它被忽略并且日志文件保持本地状态。

However it gets ignored and the log files remain local.

我从容器中运行气流,改编了就我的情况而言,但它不会将日志写入s3。我使用aws连接将数据包写入dag中,并且可以正常工作,但是无论是在EC2上还是在我的计算机上本地运行,日志都保留在本地。

I run airflow from a container, I adapted https://github.com/puckel/docker-airflow to my case but it won't write logs to s3. I use the aws connection to write to buckets in dags and this works but the Logs just remain local, no matter if I run it on an EC2 or locally on my machine.

推荐答案

我终于找到了使用

,这是我接下来必须做的大部分工作。我在这里重现此答案,并对其进行一些调整:

I finally found an answer usinghttps://stackoverflow.com/a/48969421/3808066which is most of the work I then had to ad one more step. I reproduce this answer here and adapt it a bit the way I did:

要检查的一些事情:


  1. 确保您具有 log_config.py 文件,并且该文件位于正确的目录中: ./ config / log_config.py

  2. 确保您没有忘记该目录中的 __ init __。py 文件。

  3. 确保已定义 s3.task 处理程序并将其格式化程序设置为 airflow.task

  4. 确保将airflow.task和airflow.task_runner处理程序设置为s3.task

  5. 设置 task_log_reader = s3 .task airflow.cfg

  6. 通过 S3_LOG_FOLDER log_config 。我使用变量并按照以下 log_config.py 的方式进行了检索。

  1. Make sure you have the log_config.py file and it is in the correct dir: ./config/log_config.py.
  2. Make sure you didn't forget the __init__.py file in that dir.
  3. Make sure you defined the s3.task handler and set its formatter to airflow.task
  4. Make sure you set airflow.task and airflow.task_runner handlers to s3.task
  5. Set task_log_reader = s3.task in airflow.cfg
  6. Pass the S3_LOG_FOLDER to log_config. I did that using a variable and retrieving it as in the following log_config.py.

这是一个有效的log_config.py:

Here is a log_config.py that work:

import os

from airflow import configuration as conf


LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
       's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': S3_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}

不e这样 S3_LOG_FOLDER 可以在 airflow.cfg 中指定,或者作为环境变量 AIRFLOW__CORE__S3_LOG_FOLDER

Note that this way S3_LOG_FOLDER can be specified in airflow.cfg or as environment the variable AIRFLOW__CORE__S3_LOG_FOLDER.

这篇关于气流不会将日志写入S3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-15 07:58