我目前在Google Cloud GCP的Kubernetes上运行Airflow。我的项目基于docker-airflow。我能够启动UI,但是当我尝试为Google Cloud创建连接并提交连接时,出现以下错误。

  • ValueError: Fernet key must be 32 url-safe base64-encoded bytes.
    
  • [2018-09-21 19:45:13,345] AirflowException: Could not create Fernet object: Fernet key must be 32 url-safe base64-encoded bytes.
    

  • docs推荐的第一个问题是确保您已安装密码,这是我要做的。我安装了两种类型,一种带有气流,一种来自PyPi。
    pip3 install apache-airflow[kubernetes,crypto] and also tried
    pip install cryptography
    

    我尝试按照文档here中的说明运行用于生成和存储环境变量的命令。 (如下图所示)

    1)手动生成一个Fernet密钥并添加到airflow.cfg

    2)设置环境变量并重新启动服务器。
    python -c "from cryptography.fernet import Fernet;
    print(Fernet.generate_key().decode())"
    

    示例密钥:81HqDtbqAywKSOumSha3BhWNOdQ26slT6K0YaZeZyPs=
    使用kubernetes我无法使用关闭进程ID的典型方法来重启服务器,因为它与容器相关。我还尝试将生成的密钥(上方)放在kubernetes集群的configmaps.yaml文件中(部署时等于airflow.cfg)。

    我尝试通过DAG,通过UI以及使用airflow命令行客户端手动运行GCP连接。这三种方法均返回相同的错误。我在此处包括UI提交的图片以及完整的堆栈跟踪。


  • 为什么会这样?是否未生成Fernet密钥?它可能不会保存在基础卷上吗?*

  • 谢谢您的帮助。

    -RR
    Traceback (most recent call last):
    
    File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 159, in get_fernet
    _fernet = Fernet(configuration.conf.get('core', 'FERNET_KEY').encode('utf-8'))
    File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 37, in __init__
    "Fernet key must be 32 url-safe base64-encoded bytes."
    ValueError: Fernet key must be 32 url-safe base64-encoded bytes.
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in wsgi_app
      response = self.full_dispatch_request()
    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in full_dispatch_request
      rv = self.handle_user_exception(e)
        File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in handle_user_exception
      reraise(exc_type, exc_value, tb)
    File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, in reraise
      raise value
    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in full_dispatch_request
      rv = self.dispatch_request()
    File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in dispatch_request
      return self.view_functions[rule.endpoint](**req.view_args)
    File "/usr/local/lib/python3.6/site-packages/flask_appbuilder/security/decorators.py", line 26, in wraps
      return f(self, *args, **kwargs)
    File "/usr/local/lib/python3.6/site-packages/flask_appbuilder/views.py", line 524, in edit
      widgets = self._edit(pk)
    File "/usr/local/lib/python3.6/site-packages/flask_appbuilder/baseviews.py", line 965, in _edit
      form.populate_obj(item)
    File "/usr/local/lib/python3.6/site-packages/wtforms/form.py", line 96, in populate_obj
      field.populate_obj(obj, name)
    File "/usr/local/lib/python3.6/site-packages/wtforms/fields/core.py", line 330, in populate_obj
      setattr(obj, name, self.data)
    File "<string>", line 1, in __set__
    File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 731, in set_extra
      fernet = get_fernet()
    File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 163, in get_fernet
    raise AirflowException("Could not create Fernet object: {}".format(ve))
    airflow.exceptions.AirflowException: Could not create Fernet object:
    Fernet key must be 32 url-safe base64-encoded bytes.
    

    这是基础持久卷的YAML。
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
      name: airflow-dags
      namespace: data
    spec:
      accessModes:
        - ReadOnlyMany
      storageClassName: standard
      resources:
        requests:
          storage: 8Gi
    ---
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
      name: airflow-logs
      namespace: data
    spec:
      accessModes:
        - ReadOnlyMany
      storageClassName: standard
      resources:
        requests:
          storage: 8Gi
    

    这是气流配置YAML。
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: airflow
      namespace: data
      labels:
        name: airflow
    spec:
      replicas: 1
      selector:
        matchLabels:
          name: airflow
      template:
        metadata:
          labels:
            name: airflow
        spec:
          serviceAccountName: spark-service-account
          automountServiceAccountToken: true
          initContainers:
          - name: "init"
            image: <image_name>
            imagePullPolicy: Always
            volumeMounts:
            - name: airflow-configmap
              mountPath: /root/airflow/airflow.cfg
              subPath: airflow.cfg
            - name: airflow-dags
              mountPath: /root/airflow/dags
            # - name: test-volume
            #   mountPath: /root/test_volume
            env:
            - name: SQL_ALCHEMY_CONN
              valueFrom:
                secretKeyRef:
                  name: airflow-secrets
                  key: sql_alchemy_conn
            command:
              - "bash"
            args:
              - "-cx"
              - "airflow initdb || true && airflow create_user -u airflow -l airflow -f jon -e [email protected] -r Admin -p airflow || true"
          containers:
          - name: webserver
            image: <image_name>
            imagePullPolicy: IfNotPresent
            ports:
            - name: webserver
              containerPort: 8080
            env:
            - name: <namespace_name>
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
            - name: SQL_ALCHEMY_CONN
              valueFrom:
                secretKeyRef:
                  name: airflow-secrets
                  key: sql_alchemy_conn
            command: ["/bin/sh", "-c"]
            args: ["airflow webserver"]
            volumeMounts:
            - name: airflow-configmap
              mountPath: /root/airflow/airflow.cfg
              subPath: airflow.cfg
            - name: airflow-dags
              mountPath: /root/airflow/dags
            - name: airflow-logs
              mountPath: /root/airflow/logs
            # readinessProbe:
            #   initialDelaySeconds: 5
            #   timeoutSeconds: 5
            #   periodSeconds: 5
            #   httpGet:
            #     path: /login
            #     port: 8080
            # livenessProbe:
            #   initialDelaySeconds: 5
            #   timeoutSeconds: 5
            #   failureThreshold: 5
            #   httpGet:
            #     path: /login
            #     port: 8080
          - name: scheduler
            image: image-name
            imagePullPolicy: IfNotPresent
            env:
            - name: namespace_name
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
            - name: SQL_ALCHEMY_CONN
              valueFrom:
                secretKeyRef:
                  name: airflow-secrets
                  key: sql_alchemy_conn
            command: ["/bin/sh", "-c"]
            args: ["cp ./dags/* /root/airflow/dags/; airflow scheduler"]
            volumeMounts:
            - name: airflow-configmap
              mountPath: /root/airflow/airflow.cfg
              subPath: airflow.cfg
            - name: airflow-dags
              mountPath: /root/airflow/dags
            - name: airflow-logs
              mountPath: /root/airflow/logs
          volumes:
          - name: airflow-configmap
            configMap:
              name: airflow-configmap
          - name: airflow-dags
            persistentVolumeClaim:
              claimName: airflow-dags
          - name: airflow-logs
            persistentVolumeClaim:
              claimName: airflow-logs
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: airflow
      namespace: data
    spec:
      type: NodePort
      ports:
        - port: 8080
          nodePort: 30809
      selector:
        name: airflow
    

    最佳答案

    重新启动工作服务器和Web服务器。

    您的工作人员和Web服务器正在使用旧的Fernet密钥进行操作。您在配置中更改了密钥,因此所有新存储或修改的Connections都将使用新密钥,但是Web服务器/工作人员仍在使用旧密钥。它们将永远不会匹配,并继续给出此错误,直到重新启动为止。

    关于python - Kubernetes的 Airflow GCP连接问题-Fernet key 必须是32个URL安全的base64编码的字节,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/52452476/

    10-15 19:15