我是Airflow的新手。我已经阅读了几次文档,在网上遇到许多S/O问题和许多随机文章,但尚未解决此问题。我觉得这很简单,我做错了。
我有适用于Windows的Docker,我提取了puckel/docker-airflow图像并运行了一个带有暴露端口的容器,以便可以从主机访问UI。我还有另一个运行mcr.microsoft.com/mssql/server的容器,在该容器上还原了WideWorldImporters示例数据库。通过Airflow UI,我已经能够成功创建与此数据库的连接,甚至可以从“数据分析”部分中查询它。查看下面的图像:
Connection Creation
Successful Query to Connection

因此,尽管这可行,但我的dag在第二个任务sqlData上失败了。这是代码:

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from datetime import timedelta, datetime

copyData = DAG(
    dag_id='copyData',
    schedule_interval='@once',
    start_date=datetime(2019,1,1)
)


printHelloBash = BashOperator(
    task_id = "print_hello_Bash",
    bash_command = 'echo "Lets copy some data"',
    dag = copyData
)

mssqlConnection = "WWI"
sqlData = MsSqlOperator(sql="select top 100 InvoiceDate, TotalDryItems from sales.invoices",
                       task_id="select_some_data",
                       mssql_conn_id=mssqlConnection,
                       database="WideWorldImporters",
                       dag = copyData,
                       depends_on_past=True
          )

queryDataSuccess = BashOperator(
    task_id = "confirm_data_queried",
    bash_command = 'echo "We queried data!"',
    dag = copyData
)

printHelloBash >> sqlData >> queryDataSuccess

最初的错误是:
*[2019-02-22 16:13:09,176] {{logging_mixin.py:95}} INFO - [2019-02-22 16:13:09,176] {{base_hook.py:83}} INFO - Using connection to: 172.17.0.3
[2019-02-22 16:13:09,186] {{models.py:1760}} ERROR - Could not create Fernet object: Incorrect padding
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 171, in get_fernet
    _fernet = Fernet(fernet_key.encode('utf-8'))
  File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 34, in __init__
    key = base64.urlsafe_b64decode(key)
  File "/usr/local/lib/python3.6/base64.py", line 133, in urlsafe_b64decode
    return b64decode(s)
  File "/usr/local/lib/python3.6/base64.py", line 87, in b64decode
    return binascii.a2b_base64(s)
binascii.Error: Incorrect padding*

我注意到这与加密有关,因此我继续运行pip install cryptographypip install airflow[crytpo],它们都返回了完全相同的结果,通知我该要求已经得到满足。终于,我发现一些东西说我只需要生成一个fernet_key即可。我的airflow.cfg文件中的默认 key 为fernet_key = $FERNET_KEY。因此,从容器中的cli中我运行了:
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"

并得到了我替换为$FERNET_KEY的代码。我重新启动了容器,然后重新运行了dag,现在我的错误是:
[2019-02-22 16:22:13,641] {{models.py:1760}} ERROR -
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 106, in _verify_signature
    h.verify(data[-32:])
  File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/primitives/hmac.py", line 69, in verify
    ctx.verify(signature)
  File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 73, in verify
    raise InvalidSignature("Signature did not match digest.")
cryptography.exceptions.InvalidSignature: Signature did not match digest.

最初的加密doc扫描中的哪一项与兼容性有关?

我现在迷茫了,决定决定问这个问题,看看我是否有可能在解决这个问题上走错了路。任何帮助将不胜感激,因为 Airflow 似乎很棒。

最佳答案

感谢@Tomasz的一些侧面交流,我终于让DAG可以工作了。他建议我尝试使用docker-compose,该工具也在puckel/docker-airflow github存储库中列出。我最终使用了docker-compose-LocalExecutor.yml文件而不是Celery Executor。我进行了一些小的故障排除和更多的配置。首先,我使用了现有的MSSQL容器,该容器中包含示例数据库,并使用docker commit mssql_container_name将其转换为镜像。我这样做的唯一原因是节省了必须还原备份样本数据库的时间。您可以随时将备份复制到容器中,并在以后根据需要还原它们。然后,将新镜像添加到现有的docker-compose-LocalExecutor.yml文件中,如下所示:

version: '2.1'
services:
    postgres:
        image: postgres:9.6
        environment:
            - POSTGRES_USER=airflow
            - POSTGRES_PASSWORD=airflow
            - POSTGRES_DB=airflow

    mssql:
        image: dw:latest
        ports:
            - "1433:1433"

    webserver:
        image: puckel/docker-airflow:1.10.2
        restart: always
        depends_on:
            - postgres
            - mssql
        environment:
            - LOAD_EX=n
            - EXECUTOR=Local
        #volumes:
            #- ./dags:/usr/local/airflow/dags
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        ports:
            - "8080:8080"
        command: webserver
        healthcheck:
            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
            interval: 30s
            timeout: 30s
            retries: 3

请注意, dw 是我命名的基于mssql容器的新镜像的名称。接下来,我将文件重命名为 docker-compose.yml ,以便可以轻松运行docker-compose up(不确定是否有直接指向另一个YAML文件的命令)。一切启动并运行后,我导航到Airflow UI并配置了我的连接。注意:由于您使用的是docker-compose,因此您无需知道其他容器的IP地址,因为它们使用的是DNS服务发现功能,而我发现这是关于here的。然后,为了测试连接,我去了Data Profiling进行了临时查询,但是该连接不存在。这是因为puckel/docker-airflow镜像未安装 pymssql 。因此,只需将bash放入容器docker exec -it airflow_webserver_container bash并安装pip install pymssql --user即可。退出容器并使用docker-compose restart重新启动所有服务。一分钟后,一切就绪并开始运行。我的连接显示在“临时查询”中,我可以成功选择数据。最后,我打开了DAG,调度程序将其选中,一切都成功了!经过数周的谷歌搜索, super 放心。感谢@ y2k-shubham的帮助和对@Tomasz的 super 感谢,我在他关于r/datascience subreddit上关于Airflow的精彩而透彻的发布之后,实际上是我最初与之接触的。

关于python - 尝试查询mssql数据库时出现 Airflow Fernet_Key问题,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/54831314/

10-16 17:10
查看更多