Skip to main content

Airflow DAG Configuration Examples

This page provides complete, ready-to-use YAML DAG configuration examples for every Airflow execution strategy. Each example includes both load and transform configurations with all relevant options documented inline.

For option descriptions, see the Options Reference. For customization details, see Customize Airflow DAGs.

Bash (on-premise)

Use StarlakeAirflowBashJob to execute starlake commands directly via BashOperator. Best for on-premise or VM-based deployments.

Load

metadata/dags/airflow_bash_load.sl.yml
dag:
comment: "Load tables using Bash on Airflow"
template: "load/airflow_scheduled_table_bash.py.j2"
# One DAG per domain. Use "airflow_all_tables.py" for a single DAG.
filename: "airflow_{{domain}}_load.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "/opt/starlake",
"SL_DATASETS": "/opt/starlake/datasets",
"SL_ENV": "PROD"
}

# ── Bash execution ────────────────────────────────────────────────
# Path to the starlake CLI executable
SL_STARLAKE_PATH: "/usr/local/bin/starlake"
# OS environment variables to forward to the bash command
# Use "*" to forward all variables
sl_include_env_vars: "GOOGLE_APPLICATION_CREDENTIALS,AWS_KEY_ID,AWS_SECRET_KEY"

# ── Pre-load strategy ─────────────────────────────────────────────
# Check for incoming files before loading (none | imported | pending | ack)
pre_load_strategy: "imported"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "UTC"

# ── Retry policy ──────────────────────────────────────────────────
retries: "2"
retry_delay: "60"

# ── Airflow-specific ──────────────────────────────────────────────
default_pool: "starlake_pool"
max_active_runs: "3"
tags: "starlake load bash"

Transform

metadata/dags/airflow_bash_transform.sl.yml
dag:
comment: "Run transforms using Bash on Airflow"
template: "transform/airflow_scheduled_task_bash.py.j2"
filename: "airflow_{{domain}}_tasks.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "/opt/starlake",
"SL_DATASETS": "/opt/starlake/datasets",
"SL_ENV": "PROD"
}

# ── Bash execution ────────────────────────────────────────────────
SL_STARLAKE_PATH: "/usr/local/bin/starlake"
sl_include_env_vars: "GOOGLE_APPLICATION_CREDENTIALS"

# ── Dependencies ──────────────────────────────────────────────────
# true = include all upstream loads/transforms in this DAG
# false = use Airflow data-aware scheduling (Datasets)
run_dependencies_first: "true"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "UTC"

# ── Retry policy ──────────────────────────────────────────────────
retries: "2"
retry_delay: "60"

# ── Airflow-specific ──────────────────────────────────────────────
default_pool: "starlake_pool"
tags: "starlake transform bash"

Transform with data-aware scheduling

metadata/dags/airflow_bash_transform_dataset.sl.yml
dag:
comment: "Run transforms using Bash with data-aware scheduling"
template: "transform/airflow_scheduled_task_bash.py.j2"
filename: "airflow_{{domain}}_tasks_scheduled.py"
options:
sl_env_var: >-
{
"SL_ROOT": "/opt/starlake",
"SL_DATASETS": "/opt/starlake/datasets"
}
SL_STARLAKE_PATH: "/usr/local/bin/starlake"

# Dependencies handled by Airflow Datasets instead of inline
run_dependencies_first: "false"

# How upstream datasets trigger this DAG (any | all | boolean expression)
dataset_triggering_strategy: "all"

retries: "2"
retry_delay: "60"
default_pool: "starlake_pool"
tags: "starlake transform bash scheduled"

GCP Cloud Run

Use StarlakeAirflowCloudRunJob to execute starlake commands via Cloud Run jobs. Best for serverless GCP deployments.

Prerequisites

Before configuring Starlake to use Cloud Run, you must create the Cloud Run job in your GCP project. This is different from a Cloud Run service — jobs run to completion and exit, which is the execution model Starlake uses.

# Create the Cloud Run job with the starlake Docker image
gcloud run jobs create starlake-load \
--image=gcr.io/my-gcp-project/starlake:latest \
--region=europe-west1 \
--project=my-gcp-project \
--service-account=starlake-sa@my-gcp-project.iam.gserviceaccount.com \
--memory=2Gi \
--cpu=1 \
--task-timeout=3600s \
--max-retries=0

# Create a separate job for transforms (optional — you can reuse the same job)
gcloud run jobs create starlake-transform \
--image=gcr.io/my-gcp-project/starlake:latest \
--region=europe-west1 \
--project=my-gcp-project \
--service-account=starlake-sa@my-gcp-project.iam.gserviceaccount.com \
--memory=4Gi \
--cpu=2 \
--task-timeout=7200s \
--max-retries=0

The cloud_run_job_name option in your DAG configuration must match the job name created above (e.g. starlake-load, starlake-transform).

tip

Set --max-retries=0 on the Cloud Run job itself and let the orchestrator (Airflow) handle retries via the retries option. This avoids double retry loops.

Load

metadata/dags/airflow_cloud_run_load.sl.yml
dag:
comment: "Load tables using Cloud Run on Airflow"
template: "load/airflow_scheduled_table_cloud_run.py.j2"
filename: "airflow_{{domain}}_cloud_run_load.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "gs://my-bucket/starlake",
"SL_DATASETS": "gs://my-bucket/starlake/datasets",
"SL_ENV": "BIGQUERY"
}

# ── Cloud Run execution ───────────────────────────────────────────
# GCP project ID (defaults to $GCP_PROJECT env var)
cloud_run_project_id: "my-gcp-project"
# Name of the Cloud Run job to execute (REQUIRED)
cloud_run_job_name: "starlake-load"
# Region where the Cloud Run job is deployed
cloud_run_job_region: "europe-west1"
# Service account for the Cloud Run job (optional)
cloud_run_service_account: "[email protected]"
# Run asynchronously with a completion sensor (recommended for long jobs)
cloud_run_async: "true"
# Polling interval in seconds when running async
cloud_run_async_poke_interval: "10"
# Retry on failure
retry_on_failure: "false"
# Delay between retries
retry_delay_in_seconds: "10"

# ── Pre-load strategy ─────────────────────────────────────────────
pre_load_strategy: "imported"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "Europe/Paris"

# ── Retry policy ──────────────────────────────────────────────────
retries: "3"
retry_delay: "120"

# ── Airflow-specific ──────────────────────────────────────────────
default_pool: "cloud_run_pool"
max_active_runs: "2"
tags: "starlake load cloud_run"

Transform

metadata/dags/airflow_cloud_run_transform.sl.yml
dag:
comment: "Run transforms using Cloud Run on Airflow"
template: "transform/airflow_scheduled_task_cloud_run.py.j2"
filename: "airflow_{{domain}}_cloud_run_tasks.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "gs://my-bucket/starlake",
"SL_DATASETS": "gs://my-bucket/starlake/datasets",
"SL_ENV": "BIGQUERY"
}

# ── Cloud Run execution ───────────────────────────────────────────
cloud_run_project_id: "my-gcp-project"
cloud_run_job_name: "starlake-transform"
cloud_run_job_region: "europe-west1"
cloud_run_async: "true"
cloud_run_async_poke_interval: "15"

# ── Dependencies ──────────────────────────────────────────────────
run_dependencies_first: "true"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "Europe/Paris"

# ── Retry policy ──────────────────────────────────────────────────
retries: "3"
retry_delay: "120"

# ── Airflow-specific ──────────────────────────────────────────────
default_pool: "cloud_run_pool"
tags: "starlake transform cloud_run"

GCP Dataproc

Use StarlakeAirflowDataprocJob to submit starlake commands as Spark jobs on a Dataproc cluster. Best for Spark-based workloads on GCP.

Load

metadata/dags/airflow_dataproc_load.sl.yml
dag:
comment: "Load tables using Dataproc on Airflow"
template: "load/airflow_scheduled_table_dataproc.py.j2"
filename: "airflow_{{domain}}_dataproc_load.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "gs://my-bucket/starlake",
"SL_ENV": "BIGQUERY"
}

# ── Dataproc cluster ──────────────────────────────────────────────
# GCP project ID
dataproc_project_id: "my-gcp-project"
# GCP region for the Dataproc cluster
dataproc_region: "europe-west1"
# VPC subnet
dataproc_subnet: "default"
# Cluster name identifier
dataproc_name: "starlake-cluster"
# Dataproc image version
dataproc_image_version: "2.2-debian12"
# Idle cluster TTL in seconds (auto-delete after idle)
dataproc_idle_delete_ttl: "3600"

# ── Dataproc master node ──────────────────────────────────────────
dataproc_master_machine_type: "n1-standard-4"
dataproc_master_disk_type: "pd-standard"
dataproc_master_disk_size: "1024"

# ── Dataproc worker nodes ─────────────────────────────────────────
dataproc_num_workers: "4"
dataproc_worker_machine_type: "n1-standard-4"
dataproc_worker_disk_type: "pd-standard"
dataproc_worker_disk_size: "1024"

# ── Spark configuration ───────────────────────────────────────────
# Comma-separated list of JAR files
spark_jar_list: "gs://my-bucket/jars/starlake-assembly.jar"
# Main class for the Spark job
spark_job_main_class: "ai.starlake.job.Main"
# GCS bucket for Spark logs and temp storage
spark_bucket: "my-spark-bucket"
# Spark executor resources (optional, per-task overrides also supported)
spark_executor_memory: "8g"
spark_executor_cores: "4"
spark_executor_instances: "2"

# ── Pre-load strategy ─────────────────────────────────────────────
pre_load_strategy: "imported"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"

# ── Retry policy ──────────────────────────────────────────────────
retries: "2"
retry_delay: "300"

# ── Airflow-specific ──────────────────────────────────────────────
default_pool: "dataproc_pool"
tags: "starlake load dataproc"

Transform

metadata/dags/airflow_dataproc_transform.sl.yml
dag:
comment: "Run transforms using Dataproc on Airflow"
template: "transform/airflow_scheduled_task_dataproc.py.j2"
filename: "airflow_{{domain}}_dataproc_tasks.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "gs://my-bucket/starlake",
"SL_ENV": "BIGQUERY"
}

# ── Dataproc cluster ──────────────────────────────────────────────
dataproc_project_id: "my-gcp-project"
dataproc_region: "europe-west1"
dataproc_name: "starlake-cluster"
dataproc_idle_delete_ttl: "3600"

# ── Dataproc nodes ────────────────────────────────────────────────
dataproc_master_machine_type: "n1-standard-8"
dataproc_num_workers: "4"
dataproc_worker_machine_type: "n1-standard-4"

# ── Spark configuration ───────────────────────────────────────────
spark_jar_list: "gs://my-bucket/jars/starlake-assembly.jar"
spark_bucket: "my-spark-bucket"

# ── Dependencies ──────────────────────────────────────────────────
run_dependencies_first: "true"

# ── Retry policy ──────────────────────────────────────────────────
retries: "2"
retry_delay: "300"

# ── Airflow-specific ──────────────────────────────────────────────
default_pool: "dataproc_pool"
tags: "starlake transform dataproc"

AWS Fargate

Use StarlakeAirflowFargateJob to execute starlake commands as ECS tasks on AWS Fargate. Best for serverless AWS deployments.

Load

metadata/dags/airflow_fargate_load.sl.yml
dag:
comment: "Load tables using Fargate on Airflow"
template: "load/airflow_scheduled_table_fargate.py.j2"
filename: "airflow_{{domain}}_fargate_load.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "s3://my-bucket/starlake",
"SL_DATASETS": "s3://my-bucket/starlake/datasets",
"SL_ENV": "PROD"
}

# ── AWS Fargate execution ─────────────────────────────────────────
# Airflow connection ID for AWS
aws_conn_id: "aws_default"
# AWS region
aws_region: "eu-west-1"
# ECS cluster name (REQUIRED)
aws_cluster_name: "starlake-ecs-cluster"
# ECS task definition name (REQUIRED)
aws_task_definition_name: "starlake-load-task"
# Container name in the task definition (REQUIRED)
aws_task_definition_container_name: "starlake"
# Private subnets for the task (REQUIRED, JSON array)
aws_task_private_subnets: '["subnet-0abc123def456789a", "subnet-0def456abc789012b"]'
# Security groups for the task (REQUIRED, JSON array)
aws_task_security_groups: '["sg-0abc123def456789a"]'
# CPU units for the container override (1024 = 1 vCPU)
cpu: "1024"
# Memory in MB for the container override
memory: "2048"
# Polling interval in seconds for task completion
fargate_async_poke_interval: "30"
# Retry the Fargate task on failure
retry_on_failure: "false"

# ── Pre-load strategy ─────────────────────────────────────────────
pre_load_strategy: "imported"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "UTC"

# ── Retry policy ──────────────────────────────────────────────────
retries: "3"
retry_delay: "120"

# ── Airflow-specific ──────────────────────────────────────────────
default_pool: "fargate_pool"
max_active_runs: "2"
tags: "starlake load fargate"

Transform

metadata/dags/airflow_fargate_transform.sl.yml
dag:
comment: "Run transforms using Fargate on Airflow"
template: "transform/airflow_scheduled_task_fargate.py.j2"
filename: "airflow_{{domain}}_fargate_tasks.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "s3://my-bucket/starlake",
"SL_DATASETS": "s3://my-bucket/starlake/datasets",
"SL_ENV": "PROD"
}

# ── AWS Fargate execution ─────────────────────────────────────────
aws_conn_id: "aws_default"
aws_region: "eu-west-1"
aws_cluster_name: "starlake-ecs-cluster"
aws_task_definition_name: "starlake-transform-task"
aws_task_definition_container_name: "starlake"
aws_task_private_subnets: '["subnet-0abc123def456789a", "subnet-0def456abc789012b"]'
aws_task_security_groups: '["sg-0abc123def456789a"]'
# More resources for transforms
cpu: "2048"
memory: "4096"
fargate_async_poke_interval: "30"
retry_on_failure: "true"

# ── Dependencies ──────────────────────────────────────────────────
run_dependencies_first: "true"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "UTC"

# ── Retry policy ──────────────────────────────────────────────────
retries: "3"
retry_delay: "120"

# ── Airflow-specific ──────────────────────────────────────────────
default_pool: "fargate_pool"
tags: "starlake transform fargate"

Quick comparison

StrategyBest forCloud providerKey required options
BashOn-premise, VMsAnySL_STARLAKE_PATH
Cloud RunServerless GCPGCPcloud_run_job_name
DataprocSpark workloadsGCPspark_jar_list, spark_bucket, dataproc_project_id
FargateServerless AWSAWSaws_cluster_name, aws_task_definition_name, aws_task_definition_container_name, aws_task_private_subnets, aws_task_security_groups

Minimal configurations

If you want the simplest possible setup, here are minimal configurations that rely on defaults for everything else:

Minimal Bash

dag:
template: "load/airflow_scheduled_table_bash.py.j2"
filename: "airflow_all_tables.py"

Minimal Cloud Run

dag:
template: "load/airflow_scheduled_table_cloud_run.py.j2"
filename: "airflow_all_tables.py"
options:
cloud_run_job_name: "starlake-load"

Minimal Fargate

dag:
template: "load/airflow_scheduled_table_fargate.py.j2"
filename: "airflow_all_tables.py"
options:
aws_cluster_name: "my-cluster"
aws_task_definition_name: "starlake-task"
aws_task_definition_container_name: "starlake"
aws_task_private_subnets: '["subnet-abc123"]'
aws_task_security_groups: '["sg-abc123"]'