I get the following error when trying to use the Docker operator in Airflow. The airflow setup is not visible to me (it is running by another team on a machine I cannot access and the responsible team is not responsive). I created the docker image from a docker file I wrote myself. The name cmprod refers to the docker image.
ImageNotFound: 404 Client Error: Not Found ("pull access denied for cmprod, repository does not exist or may require 'docker login': denied: requested access to the resource is denied")
I am unfamiliar with the use of docker login and I am not sure if it applies in this case since I am able to run some images and not others.At first I though I incorrectly typed the name of the docker image but I checked and double checked. Below is the output of docker images
. I was able to run the image condatest successfully through airflow.
cm_prod latest 08f408557eb7 15 hours ago 2.12GB
cmprod latest 08f408557eb7 15 hours ago 2.12GB
<none> <none> 4af8c991ea19 15 hours ago 730MB
<none> <none> 9da4759a3316 15 hours ago 64.2MB
condatest latest e24563f9bb48 5 days ago 2.12GB
I thought I might be using the docker operator incorrectly but I am able to run some other images. I thought maybe there was an airflow configuration issue where certain operating systems were not allowed or running with certain permissions was not allowed but I have been unable to find any documentation on whether this is possible.
My testing does not show any of the above factors to determine 100% whether a docker image can or cannot be found by airflow using the docker operator. This problem does not seem amenable to trial and error. Any advice on what may be happening would be appreciated.
I am able to see the airflow UI in my browser and trigger dags and there is a shared directory where I can dump my dag specification script. Airflow is Version : 1.10.3.
The version info for docker follows docker version
Client: Docker Engine - Community
Version: 19.03.6
API version: 1.40
Go version: go1.12.16
Git commit: 369ce74a3c
Built: Thu Feb 13 01:29:29 2020
OS/Arch: linux/amd64
Experimental: false
Server: Docker Engine - Community
Version: 19.03.6
API version: 1.40 (minimum version 1.12)
Go version: go1.12.16
Git commit: 369ce74a3c
Built: Thu Feb 13 01:28:07 2020
OS/Arch: linux/amd64
Experimental: false
Version: 1.2.10
GitCommit: b34a5c8af56e510852c35414db4c1f4fa6172339
Version: 1.0.0-rc8+dev
GitCommit: 3e425f80a8c931f88e6d94a8c831b9d5aa481657
Version: 0.18.0
GitCommit: fec3683
The airflow DAG code was requested. I am hesitant to post the whole thing because I inherited some code from a team member who left and I feel like some of the code in the dag would be best implemented as a separate script. Below are the most relevant code blocks. Let me know if anything seems missing. There is a section between these blocks I omit for clarity but can include if nothing seems to work.
from functools import reduce
import os, os.path
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.mssql_operator import MsSqlOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.helpers import chain
CODE BLOCK 2: DAG and OPERATOR Instantiation
# create SQL operators
def create_SQL_operator(taskfile, dag):
Creates a MsSQL operator for a given DAG.
op = MsSqlOperator(
sql=readSQL(os.path.join(ProjDir, taskfile)),
return op
# Airflow arguments
default_args = {
'owner': 'airflow',
'description': 'Parallel SQL DAG',
'depend_on_past': False,
'start_date': datetime(2020, 1, 1),
'email': ['*PERSONTOEMAIL*'],
'email_on_failure': False,
'email_on_retry': True
# DAG definition
DAG = DAG(ProjName + '_and_infer',
description='Running parallel SQLs for project: {} and inference on the data'.format(ProjName),
schedule_interval=CronTime, # '0 */2 * * *', #every 2 hours
concurrency=50, # setup to allow 50 concurrent parallel tasks
t_predict = DockerOperator(
command='bash inference.sh ',
# Create SQL task operators in Airflow global space
ops = []
ops = [(order, create_SQL_operator(taskfile, DAG)) for order, taskfile in sql_rank]
ops.sort(key=lambda tup: tup[0])
# create cluster ops list
from itertools import groupby
from operator import itemgetter
opsList = []
opsList = [[j for i, j in grouper] for order, grouper in groupby(ops, key=itemgetter(0))]
# flatten list with only 1 element: Airflow chain() cannot accept list of lists!!
chainList = []
chainList = [reduce(plus, list) if len(list) == 1 else list for list in opsList]
# create final DAG graph
exec(r' >> '.join([r'chainList['+str(i)+r']' for i in range(len(chainList))]))
UPDATESince I originally posted this question I substituted the condatest image into the above code and managed to error out in a different way: there was a missing shell script in the mounted directory.
When I copied the missing file and ran again airflow could no longer find the condatest image. I checked and saw that the newly copied script did not have execute permission and added the permission. Airflow still could not find the previously working docker container.
I deleted the shell script and airflow can find the container again. Does this mean the problem has to do with Linux permissions? It is unclear to me how the contents of the mounted drive affect the ability of airflow to detect the container. Furthermore, I know I was able to run that same script using a docker container started by a dockerobject in airflow in the past.
After upgrading airflow to airflow2 the logs have provided some additional information. Airflow had been configured to run on multiple servers and docker had been set up on each server but no image registry had been used. It seems that when the job scheduler tried to execute the dag on a server other than the server where I built my docker image the image was unavailable. It seems the workarounds I was finding earlier were just coincided with a lucky draw for what server my job was scheduled on.
To resolve this we have configured our scheduler to use only one server.
