diff --git a/.gitignore b/.gitignore index 677b580..bab9224 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ dags/ plugins/ logs/ +config/ project_data/ diff --git a/docker-compose.yaml b/docker-compose.yaml index 64e1f56..6598b8c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -24,7 +24,7 @@ # The following variables are supported: # # AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. -# Default: apache/airflow:2.6.0 +# Default: apache/airflow:3.0.4 # AIRFLOW_UID - User ID in Airflow containers # Default: 50000 # AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. @@ -44,27 +44,24 @@ # # Feel free to modify this file to suit your needs. --- -version: '3.8' x-airflow-common: &airflow-common - # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # In order to add custom dependencies or upgrade provider distributions you can use your extended image. # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml # and uncomment the "build" line below, Then run `docker-compose build` to build the images. - image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.0} + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:3.0.4} # build: . environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__CORE__AUTH_MANAGER: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow - # For backward compatibility, with Airflow <2.3 - AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'true' - AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' - AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 300 + AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/' # yamllint disable rule:line-length # Use simple http server on scheduler for health checks # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server @@ -73,9 +70,12 @@ x-airflow-common: # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks # for other purpose (development, test and especially production usage) build/extend Airflow image. _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + # The following line can be used to set a custom config file, stored in the local config folder + AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg' volumes: - ./project_data:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" depends_on: @@ -102,7 +102,9 @@ services: restart: always redis: - image: redis:latest + # Redis is limited to 7.2-bookworm due to licencing change + # https://redis.io/blog/redis-adopts-dual-source-available-licensing/ + image: redis:7.2-bookworm expose: - 6379 healthcheck: @@ -113,13 +115,13 @@ services: start_period: 30s restart: always - airflow-webserver: + airflow-apiserver: <<: *airflow-common - command: webserver + command: api-server ports: - "8080:8080" healthcheck: - test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version"] interval: 30s timeout: 10s retries: 5 @@ -145,13 +147,29 @@ services: airflow-init: condition: service_completed_successfully + airflow-dag-processor: + <<: *airflow-common + command: dag-processor + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + airflow-worker: <<: *airflow-common command: celery worker healthcheck: + # yamllint disable rule:line-length test: - "CMD-SHELL" - - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' interval: 30s timeout: 10s retries: 5 @@ -164,6 +182,8 @@ services: restart: always depends_on: <<: *airflow-common-depends-on + airflow-apiserver: + condition: service_healthy airflow-init: condition: service_completed_successfully @@ -189,20 +209,6 @@ services: command: - -c - | - function ver() { - printf "%04d%04d%04d%04d" $${1//./ } - } - airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version) - airflow_version_comparable=$$(ver $${airflow_version}) - min_airflow_version=2.2.0 - min_airflow_version_comparable=$$(ver $${min_airflow_version}) - if (( airflow_version_comparable < min_airflow_version_comparable )); then - echo - echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" - echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" - echo - exit 1 - fi if [[ -z "${AIRFLOW_UID}" ]]; then echo echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" @@ -211,6 +217,7 @@ services: echo "For other operating systems you can get rid of the warning with manually created .env file:" echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" echo + export AIRFLOW_UID=$$(id -u) fi one_meg=1048576 mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) @@ -245,20 +252,47 @@ services: echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" echo fi - mkdir -p /sources/logs /sources/dags /sources/plugins - chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} - exec /entrypoint airflow version + echo + echo "Creating missing opt dirs if missing:" + echo + mkdir -v -p /opt/airflow/{logs,dags,plugins,config} + echo + echo "Airflow version:" + /entrypoint airflow version + echo + echo "Files in shared volumes:" + echo + ls -la /opt/airflow/{logs,dags,plugins,config} + echo + echo "Running airflow config list to create default config file if missing." + echo + /entrypoint airflow config list >/dev/null + echo + echo "Files in shared volumes:" + echo + ls -la /opt/airflow/{logs,dags,plugins,config} + echo + echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0" + echo + chown -R "${AIRFLOW_UID}:0" /opt/airflow/ + echo + echo "Change ownership of files in shared volumes to ${AIRFLOW_UID}:0" + echo + chown -v -R "${AIRFLOW_UID}:0" /opt/airflow/{logs,dags,plugins,config} + echo + echo "Files in shared volumes:" + echo + ls -la /opt/airflow/{logs,dags,plugins,config} + # yamllint enable rule:line-length environment: <<: *airflow-common-env - _AIRFLOW_DB_UPGRADE: 'true' + _AIRFLOW_DB_MIGRATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} _PIP_ADDITIONAL_REQUIREMENTS: '' user: "0:0" - volumes: - - ${AIRFLOW_PROJ_DIR:-.}:/sources airflow-cli: <<: *airflow-common @@ -272,6 +306,8 @@ services: - bash - -c - airflow + depends_on: + <<: *airflow-common-depends-on # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up # or by explicitly targeted on the command line e.g. docker-compose up flower. @@ -306,7 +342,5 @@ services: DIRECTORY_NAME: ${DIRECTORY_NAME:-project} DESTINATION_PATH: ${DESTINATION_PATH:-/app/sync} INTERVAL: ${INTERVAL:-10} - volumes: postgres-db-volume: - diff --git a/examples/dags/sample_bash_operator_dag.py b/examples/dags/sample_bash_operator_dag.py index a61b045..704510c 100644 --- a/examples/dags/sample_bash_operator_dag.py +++ b/examples/dags/sample_bash_operator_dag.py @@ -1,25 +1,22 @@ -from airflow import DAG from datetime import datetime, timedelta -from airflow.operators.bash_operator import BashOperator -from airflow.operators.docker_operator import DockerOperator +from airflow import DAG +from airflow.providers.standard.operators.bash import BashOperator default_args = { - 'owner' : 'Mostafa Ghadimi', - 'description' : 'Use of the DockerOperator', - 'depend_on_past' : False, - 'start_date' : datetime(2023, 4, 4), - 'email_on_failure' : False, - 'email_on_retry' : False, - 'retries' : 1, - 'retry_delay' : timedelta(minutes=5) + "owner": "Mostafa Ghadimi", + "description": "Use of the BashOperator", + "depend_on_past": False, + "start_date": datetime(2023, 4, 4), + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), } -with DAG('docker_dag_sample', default_args=default_args, schedule_interval="5 10 * * *", catchup=False) as dag: - t1 = BashOperator( - task_id='print_hello', - bash_command='echo "hello world"' - ) +with DAG( + "bash_dag_sample", default_args=default_args, schedule="5 10 * * *", catchup=False +) as dag: + t1 = BashOperator(task_id="print_hello", bash_command='echo "hello world"') t1 - diff --git a/examples/dags/sample_docker_operator_dag.py b/examples/dags/sample_docker_operator_dag.py index ccdaeef..8d29d19 100644 --- a/examples/dags/sample_docker_operator_dag.py +++ b/examples/dags/sample_docker_operator_dag.py @@ -1,30 +1,33 @@ -from airflow import DAG from datetime import datetime, timedelta -from airflow.operators.bash_operator import BashOperator -from airflow.operators.docker_operator import DockerOperator +from airflow import DAG +from airflow.providers.docker.operators.docker import DockerOperator default_args = { - 'owner' : 'Mostafa Ghadimi', - 'description' : 'Use of the DockerOperator', - 'depend_on_past' : False, - 'start_date' : datetime(2023, 4, 23), - 'email_on_failure' : False, - 'email_on_retry' : False, - 'retries' : 1, - 'retry_delay' : timedelta(minutes=5) + "owner": "Mostafa Ghadimi", + "description": "Use of the DockerOperator", + "depend_on_past": False, + "start_date": datetime(2023, 4, 23), + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), } -with DAG('docker_dag_v2', default_args=default_args, schedule_interval="5 10 * * *", catchup=False) as dag: +with DAG( + "docker_dag_v2", + default_args=default_args, + schedule="5 10 * * *", + catchup=False, +) as dag: t1 = DockerOperator( - task_id='docker_command', - image='alpine:latest', - api_version='auto', - auto_remove=True, + task_id="docker_command", + image="alpine:latest", + api_version="auto", + auto_remove="success", command="/bin/sleep 30", docker_url="unix://var/run/docker.sock", - network_mode="bridge" + network_mode="bridge", ) t1 -