问题描述
我尝试了不同的方法来配置Airflow 1.9以将日志写入s3,但是它只是忽略了它。我发现很多人在这样做后都无法阅读日志,但是我的问题是日志仍然是本地的。我可以毫无问题地读取它们,但它们不在指定的s3存储桶中。
I tried different ways to configure Airflow 1.9 to write logs to s3 however it just ignores it. I found a lot of people having problems reading the Logs after doing so, however my problem is that the Logs remain local. I can read them without problem but they are not in the specified s3 bucket.
我尝试的是首先写入airflow.cfg文件
What I tried was first to write into the airflow.cfg file
# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = s3://bucketname/logs
remote_log_conn_id = aws
encrypt_s3_logs = False
然后我尝试设置环境变量
Then I tried to set environment variables
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucketname/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws
AIRFLOW__CORE__ENCRYPT_S3_LOGS=False
但是它被忽略并且日志文件保持本地状态。
However it gets ignored and the log files remain local.
我从容器中运行气流,改编了就我的情况而言,但它不会将日志写入s3。我使用aws连接将数据包写入dag中,并且可以正常工作,但是无论是在EC2上还是在我的计算机上本地运行,日志都保留在本地。
I run airflow from a container, I adapted https://github.com/puckel/docker-airflow to my case but it won't write logs to s3. I use the aws connection to write to buckets in dags and this works but the Logs just remain local, no matter if I run it on an EC2 or locally on my machine.
推荐答案
我终于找到了使用
,这是我接下来必须做的大部分工作。我在这里重现此答案,并对其进行一些调整:
I finally found an answer usinghttps://stackoverflow.com/a/48969421/3808066which is most of the work I then had to ad one more step. I reproduce this answer here and adapt it a bit the way I did:
要检查的一些事情:
- 确保您具有
log_config.py
文件,并且该文件位于正确的目录中:./ config / log_config.py
。 - 确保您没有忘记该目录中的
__ init __。py
文件。 - 确保已定义
s3.task
处理程序并将其格式化程序设置为airflow.task
- 确保将airflow.task和airflow.task_runner处理程序设置为s3.task
- 设置
task_log_reader = s3 .task
在airflow.cfg
中 - 通过
S3_LOG_FOLDER
到log_config
。我使用变量并按照以下log_config.py
的方式进行了检索。
- Make sure you have the
log_config.py
file and it is in the correct dir:./config/log_config.py
. - Make sure you didn't forget the
__init__.py
file in that dir. - Make sure you defined the
s3.task
handler and set its formatter toairflow.task
- Make sure you set airflow.task and airflow.task_runner handlers to s3.task
- Set
task_log_reader = s3.task
inairflow.cfg
- Pass the
S3_LOG_FOLDER
tolog_config
. I did that using a variable and retrieving it as in the followinglog_config.py
.
这是一个有效的log_config.py:
Here is a log_config.py that work:
import os
from airflow import configuration as conf
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
不e这样 S3_LOG_FOLDER
可以在 airflow.cfg
中指定,或者作为环境变量 AIRFLOW__CORE__S3_LOG_FOLDER
。
Note that this way S3_LOG_FOLDER
can be specified in airflow.cfg
or as environment the variable AIRFLOW__CORE__S3_LOG_FOLDER
.
这篇关于气流不会将日志写入S3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!