Developing a CI/CD Practice for Google Cloud Composer

Promoting a DevOps culture has become essential to secure communication and collaboration within a software team. Establishing an excellent Continuous Integration/Continuous Delivery standard practice for Google Cloud Composer has a variety of benefits including:

  • Faster development cycles of Airflow DAGs
  • Uniform repo structure within the team 
  • Reductions in errors when automating DAG deployments 
  • Faster to debug in the event of a failure

I got the privilege to work with OpenX, programmatic advertising company, building a CI/CD solution for Cloud Composer. The intended audience for this article is anyone who has a working knowledge of Cloud Composer, Jenkins, and Spinnaker and wants a high level understanding of the CI/CD solution implemented at OpenX. This article focuses on the structure and automation of DAG deployments and less on setting up the Spinnaker pipeline.

Cloud Composer Challenges

Airflow uses a concept called workflow, which is represented as a Directed Acyclic Graph or ‘DAG’ to run a collection of tasks in the Cloud Composer environment. Before uploading a DAG to the Composer environment, the following items are necessary in order for deployment to be successful:

  • No syntax errors in a DAG file
  • Airflow variables referenced in the DAG file must exist in the Cloud Composer environment
  • Airflow connections used in the DAG file must exist in the Cloud Composer environment

Overwriting a DAG file is challenging because of how data synchronization is implemented in the Cloud Composer. Modified DAG files are unidirectionally synced from GCS to the /home/airflow/gcs/dags directory on the workers and the webserver. This periodic synchronization process is eventually consistent.

  • Minor modifications to the task parameters are synced by all workers and scheduler in a short duration. Any significant changes to the structure of the DAG such as adding new tasks, changing the schedule interval, etc… increases the syncing time
  • The nature of Cloud Composer introduces a non-deterministic behavior when overwriting a DAG. If a DAG is already running, the Airflow worker could potentially run either the old or new version of the DAG in the middle of the run. Knowing which version of the DAG is being run is uncertain.
  • The older version of the DAG running on the Composer environment has metadata stored in the database. It is not ideal to continue appending the new DAG’s metadata, especially if the start date or schedule interval is changing.

For the reasons mentioned above,  overwriting a DAG is problematic. It is best to avoid this process. When making any modifications to the DAG, it is best practice to rename and deploy the changes as a new DAG instead. This will clearly distinguish between the older and modern version of a DAG.  

It is worth noting that DAGs cannot simply be stopped and started. To upload the newer version of a DAG, there is a sequence of steps that are executed to keep the Cloud Composer environment in sync. Refer to the deploy script section for the automation process.

This is a high level architecture of the whole CI/CD process. There are two GitHub repos.  The airflow-deployer repo contains the deploy script to automate Airflow deployment. The airflow-dags repo contains all the necessary artifacts to run a DAG. Both of the repos GitHub webhooks are configured with Jenkins. Once a new commit is made to the master branch, Jenkins triggers the build. At the end of the Jenkins build, the original Docker image will contain the following files and software installed:

  • Local Apache Airflow
  • Google Cloud SDK
  • Python3
  • Clone of team’s Airflow repo (master branch)
  • Deploy script file
  • DAG validation file

Once the Docker image is built, Spinnaker starts the pipeline and closely interacts with the deploy script to deploy DAGs to the Cloud Composer environment.

Airflow Repo Structure

There currently is no standardized practice yet for structuring an Airflow repo.  Below is the structure assumed by this pipeline. The Airflow repo structure is enforced and helps developers have consistent code organization helps the deploy script search for the right files during deployment. A new configuration file called running_dags.txt file is introduced to help deploy only the DAGs that exist in that file during the next iteration. It takes the opinionated approach of version controlling your variables with your DAG code for ease of integration, testing, and clarity.

| 
|__ sample_dag_v_1_2_1.py  
|__ variables/  
|   |__ variables-devint.json
|   |__ variables-qa.json
|   |__ variables-prod.json 
|   |__ README.md  
|__ plugins/  
|   |__ example_plugin
|   |__ __init__.py
|       |__ sensors/
|           |__ __init__.py
|           |__ sample_sensor.py
|       |__ operators/ 
|           |__ __init__.py
|           |__ sample_operator.py
|       |__ hooks/ 
|           |__ __init__.py
|           |__ sample_hook.py
|__ README.md
|__ running_dags.txt

This is an example DAG repo that has to be consistent among the team to deploy DAGs properly. Our solution hinges on the repo abiding by the following rules:

  • All DAG files should exist on the top level of the directory.
    • All dag_id should match the filename without the .py extension.
    • All dag_id should conform to the regex pattern enforced by the deploy script.
  • The variables directory should have a variable file for each environment. 
  • Any custom sensors, operators, and hooks should exist in the plugin directory as a plugin. 
  • The running_dags.txt configuration file contains a list of dag_ids that should be running in Cloud Composer at the end of the next deployment.

DAG Versioning

For this deployment process, DAG definition files should follow the naming convention of <dag_id>_v_<semantic version number>.py. This is the recommendation for OpenX based on their requirements. The Regex pattern could vary based on the use case. It is important to have a standard naming convention throughout the CI/CD process which makes it convenient to identify and keep track of DAG versions.

In a rollback scenario, the best practice is to revert back to the older code changes, but still increase the DAG version. Increasing the DAG version will create a new unique dag_id that has not yet been run on the Cloud Composer environment. This way the Airflow metadata does not continue to append, but start a new entry in the database.

example_dag_v2_1_5.py 
             | | |__ Dev version 
             | |____ QA version 
             |______ Prod Version

Running DAGs File

The running_dags.txt is a configuration file to help organize which DAGs have to stop, start, and continue running during the next iteration. The file should contain only dag_id without the .py extension.

Example Scenario:
Cloud Composer Environment:
> gcloud composer environments run “$ENVIRONMENT” list_dags

example_dag_1_v1_1_1 
example_dag_3_v1_1_1 # In this example we will delete this DAG

running_dags.txt:

example_dag_1_v1_1_1 # Keep in Composer environment 
example_dag_2_v1_1_1 # Add to Composer environment 
example_dag_3_v1_1_2 # Add to Composer environment (Version update) 
                     # Remove example_dag_3_v1_1_1 from Composer          
                       environment

From the following use case, the deploy script will output a list of all the dag_ids that need to be started and stopped. DAGs that are already running on the Cloud Composer environment and exist in running_dags.txt file will be ignored. The implementation can be found in the deploy script section.

Deploy Script Output:

SPINNAKER_PROPERTY_DAGS_TO_STOP=example_dag_3_v1_1_1
SPINNAKER_PROPERTY_DAGS_TO_START=example_dag_2_v1_1_1,example_dag_3_v1_1_2

Deploy Script Overview

The deploy script is used to automate from setting up the Airflow variables and plugins to the deployment and removal of a DAG. Spinnaker uses the deploy script menu system to do the desired work by calling the deploy script with arguments. This section discusses an overview of the Spinnaker pipeline and the bash script components that are being called by the Spinnaker pipeline. The full deploy script, Dockerfile, and DAG validation code can be found here.

Spinnaker Pipeline

The following Spinnaker pipeline shows one iteration cycle after a code has been committed and built by Jenkins. The Spinnaker pipeline main order is to stop the older version of any DAGs, verify if new modifications to the DAGs are valid and then upload plugins, variables, and DAGs to the Cloud Composer environment.

Note: Bake in this use case is the process of generating a Kubernetes manifest file.

Below is a detailed overview of the six main steps from the Spinnaker pipeline diagram.

1. Bake Plugins & Vars

  • Manifest file is populated with the following values:
      • Image: gcr.io/PATH:TAG
      • variables-[DEV|QA|PROD].json
      • PROJECT
      • BUCKET
      • REGION
      • ENVIRONMENT
      • SERVICE_ACCOUNT_KEY_LOCATION

2. Get DAGs To Stop/Start

    • Get the list of DAGs currently running on the composer and compare it with running_dags.txt file to get a list of DAGs that need to be started and stopped.
>  ./deploy.sh get-stop-and-start-dags

Output:
SPINNAKER_PROPERTY_DAGS_TO_STOP=example_dag_3_v1_1_1
SPINNAKER_PROPERTY_DAGS_TO_START=example_dag_2_v1_1_1,example_dag_3_v1_1_2

3. Bake DAGs to Stop & Bake DAGs to Start

  • From the output of get-stop-and-start-dags, bake manifest files for both start and stop DAGs stage.

4. Stop DAGs

  • Pause DAG, delete DAG file from GCS and metadata from UI.
>  ./deploy.sh stop-dag DAG_ID

5. Upload Plugins & Vars

    • Initialize local Airflow environment (Path to DAG and Plugins folder)
    • Upload Airflow variables and connections to the local Airflow
    • Validate DAGs
    • Upload Airflow variables and plugins folder to Cloud Composer Environment.

6. Start DAGs

    • Upload DAG file to GCS and unpause DAG. 
>  ./deploy.sh start-dag DAG_ID

Airflow Configuration

Before running the deploy script, we ensured a Cloud Composer environment was already provisioned and had the following Airflow configuration dags_are_paused_at_creating set to True. 

gcloud composer environments update $ENVIRONMENT \
    --location=$LOCATION \
    --update-airflow-configs=core-dags_are_paused_at_creation=true

Note: The code snippets provided are merely a high level overview to prove the concept. Error checking and miscellaneous actions are discarded to keep this article shorter.

Before enabling the Airflow configuration, Cloud Composer showed odd characteristics when transitioning from stopping and starting two different DAGs. The flow of stopping a DAG is: pause the DAG, delete the file from GCS, and delete the metadata. It is important to note that once a DAG is stopped and deleted, Cloud Composer will still show the DAG on the UI due to the delay in syncing. The flow of starting a DAG is simply to upload the file. Cloud Composer will automatically unpause the DAG and begin the task without any intervention. But during the transition from stopping an old DAG and starting a new DAG, Cloud Composer will unpause the old DAG again making the CI/CD process inconsistent and unreliable. To get rid of this odd quirk, dags_are_paused_at_creation is enabled and an extra step of unpausing is added to the flow of starting a DAG. 

Deploy Script Menu

Deploy script contains a menu system for Spinnaker to invoke calls to deploy and tear down a DAG. With this menu system, Spinnaker will be able to call each of the components by providing any of the following arguments: 

  • get-stop-and-start-dags
  • stop-dag
  • upload-plugins-and-variables
  • start-dag
case "$1" in
   get-stop-and-start-dags)
       get-stop-and-start-dags
       ;;
   stop-dag)
       handle_delete "$2"
       ;;
   upload-plugins-and-variables)
       validate_dags_and_variables "$2"

       handle_variables "$2"
       handle_plugins
       ;;
   start-dag)
       handle_new "$2"
       ;;
   *)
       echo "Usage: $1 is not valid. { get-stop-and-start-dags | \
         stop-dag | upload-plugins-and-variables | start-dag }"
       exit 1
       ;;
esac

List Stop and Start DAGs

From the current DAGs running on the Composer environment and the proposed DAGs that need to be running on the configuration file, the get-stop-and-start-dags function lists all the DAGs that need to be started and stopped. The results are printed out and Spinnaker uses the output to configure the stop-dag and start-dag Kubernetes manifest files.

# Outputs a list of DAGs that need to be started and stopped based on
# the current running DAGs on the Cloud Composer environment and the 
# running_dags.txt file.
function get-stop-and-start-dags() {
    DAGNAME_REGEX=".+_dag_v[0-9]_[0-9]_[0-9]$"
   
    RUNNING_DAGS=$(gcloud -q composer environments run "$ENVIRONMENT" \
        --location "$REGION" list_dags 2>&1 | sed -e '1,/DAGS/d' | \
        tail -n +2 | sed '/^[[:space:]]*$/d'| grep -iE "$DAGNAME_REGEX" )

    DAGS_TO_RUN=$(cat running_dags.txt | grep -iE "$DAGNAME_REGEX" )

    DAGS_TO_STOP=$(arrayDiff "${RUNNING_DAGS[@]}" "${DAGS_TO_RUN[@]}")
    DAGS_TO_START=$(arrayDiff "${DAGS_TO_RUN[@]}" "${RUNNING_DAGS[@]}")

    echo "SPINNAKER_PROPERTY_DAGS_TO_STOP=${DAGS_TO_STOP// /,}"
    echo "SPINNAKER_PROPERTY_DAGS_TO_START=${DAGS_TO_START// /,}"
}

Stop DAG

To stop a DAG, the function first pauses a DAG, deletes the file from GCS, and then deletes the DAG from Airflow UI. The delete DAG from Airflow UI command does not always delete the metadata for the first time because of Composer syncing issues. It is best to iterate the last command a few times at a minute interval.

# Perform a sequence of steps to delete a DAG.
# $1 dag_id (string)
function handle_delete() {
    # Pause DAG
    gcloud -q composer environments run "$ENVIRONMENT" pause \
        -- "$DAG_ID"

    # Delete DAG from GCS
    gcloud -q composer environments storage dags delete \
        --environment="$ENVIRONMENT" \
        -- "$FILENAME"

    # Delete DAG from Airflow UI
    gcloud -q composer environments run "$ENVIRONMENT" delete_dag \
        -- "$DAG_ID"
}

Validate DAGs, Variables, and Connections

Before uploading any new changes to the Cloud Composer Environment, a sanity check is performed to ensure the validity of the new DAGs. In order to do a local check, a local Airflow environment is setup to mimic the Cloud Composer environment. Variables.json file is uploaded based on the development, QA, or production environment. The Airflow connections are manually entered on the Cloud Composer environment and the script will automatically upload to the local Airflow environment with dummy values. Once the local Airflow environment is setup, dag validation can be performed by checking the DagBag to ensure: 

  • No syntax errors
  • Detect DAG cycles
  • Airflow variables referenced exist in the variables.json file
  • Airflow connections used exist in the Cloud Composer environment.
# Validate all DAGs in the running_dags.txt file using the Airflow environment.
# $1 variables-ENV.json file (string)
function validate_dags_and_variables() {

    FERNET_KEY=$(python3.6 -c "from cryptography.fernet import Fernet; \
        print(Fernet.generate_key().decode('utf-8'))")

    export FERNET_KEY

    airflow initdb

    # Import Airflow Variables to local Airflow.
    airflow variables --import variables/"$1"

    # Get current Cloud Composer custom connections.
    AIRFLOW_CONN_LIST=$($GCLOUD composer environments run "$ENVIRONMENT" \
        connections -- --list 2>&1 | grep "?\s" | awk '{ FS = "?"}; {print $2}' | \
        tr -d ' ' | sed -e "s/'//g" | grep -v '_default$' | \
        grep -v 'local_mysql' | tail -n +3 | grep -v "\.\.\.")

    # Upload custom connections to local Airflow with dummy values.
    for conn_id in $AIRFLOW_CONN_LIST; do
        airflow connections --add --conn_id "$conn_id" --conn_type http
    done

    RUNNING_DAGS=$(grep -iE "$DAGNAME_REGEX" < "$DAG_LIST_FILE")

    # Copy all RUNNING_DAGS to local Airflow dags folder for DAG validation.
    for dag_id in $RUNNING_DAGS; do
        cp "$dag_id".py /usr/local/airflow/dags
    done

    # List all DAGs running on local Airflow.
    LOCAL_AIRFLOW_LIST_DAGS=$(airflow list_dags | sed -e '1,/DAGS/d' | \
        tail -n +2 | sed '/^[[:space:]]*$/d'| \
        grep -iE ".+_dag_v[0-9]_[0-9]_[0-9]$")

    # Run DAG validation tests.
    python3.6 dag_validation_test.py "$LOCAL_AIRFLOW_LIST_DAGS"
}

Upload Variables

The upload variables function uploads the variable file to Composer GCS bucket in the data directory and gets synced to the worker nodes. It is important to upload the file to the data directory first because setting the Airflow variables cannot be run with a local file. Therefore the file has to exist in the data directory. After the file has been uploaded, Airflow variables are imported based on the variables.json file.

# Upload and set Airflow variables
# $1 variables-ENV.json file (string)
function handle_variables() {
    # Upload variables-env.json file to Composer GCS bucket
    gcloud -q composer environments storage data import \
        --source=variables/"$FILENAME" \
        --environment="$ENVIRONMENT"

    # Set Airflow variables in the Composer environment
    gcloud -q composer environments run "$ENVIRONMENT" variables \
        -- --i /home/airflow/gcs/data/"$FILENAME"
}

Upload Plugins

The plugins folder is a simple gsutil rsync command to copy, modify, and delete any files from the plugins directory. This can also be written with the gcloud composer plugins command, but this will complicate issues in figuring out which files need to be added, modified, or deleted.

# Upload Airflow Plugins
function handle_plugins() {
    # Sync local and GCS Plugins directory
    gsutil -q rsync -r -d plugins/ gs://"$BUCKET"/plugins/
}

Start DAG

To start a DAG, the function uploads the file to GCS and unpauses the DAG. The unpause DAG command does not work the first time because of Cloud Composer syncing issues. It is best to iterate the last command a few times at a minute interval.

function handle_new() {
    # Upload file to Composer bucket
    gcloud -q composer environments storage dags import \
        --environment="$ENVIRONMENT" \
        --source="$FILENAME"

    # Unpause the DAG
    gcloud -q composer environments run "$ENVIRONMENT" unpause \
        -- "$DAG_ID"
}

Note: Dags usually are picked up by Composer within a minute.

Conclusion

Cloud Composer brings great value for tech companies that need to orchestrate their batch data workflows. The above CI/CD solution was built based on OpenX use case and could vary. The most important parts to note are the high level architecture and the deployment scripts which help solve the DAG versioning in the Composer environment. 

If you are looking for additional Cloud Composer resources or information about Spinnaker side of things, check out these resources:

If you still have questions, drop us a line at info@SpringML.com or tweet us @springmlinc.

Software used:
Composer-1.7.1
Airflow-1.10.2
Python-3.6
Jenkins-2.176.1
Spinnaker-1.14

Acknowledgements:
Big thanks to Jacob Ferriero (Data Engineer Lead at Google) and Miles Matthias (GCP/Spinnaker Consultant) in helping build this solution. And OpenX for giving us the opportunity to solve this problem.