Skip to main content

Dagster DAG Configuration Examples

This page provides complete, ready-to-use YAML DAG configuration examples for every Dagster execution strategy. Each example includes both load and transform configurations with all relevant options documented inline.

For option descriptions, see the Options Reference. For customization details, see Customize Dagster DAGs.

Shell (on-premise)

Use StarlakeDagsterShellJob to execute starlake commands directly via the dagster-shell library. Best for on-premise or local development.

Load

metadata/dags/dagster_shell_load.sl.yml
dag:
comment: "Load tables using Shell on Dagster"
template: "load/dagster_scheduled_table_shell.py.j2"
# One DAG per domain. Use "dagster_all_load.py" for a single DAG.
filename: "dagster_{{domain}}_load.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "/opt/starlake",
"SL_DATASETS": "/opt/starlake/datasets",
"SL_ENV": "PROD"
}

# ── Shell execution ───────────────────────────────────────────────
# Path to the starlake CLI executable
SL_STARLAKE_PATH: "/usr/local/bin/starlake"

# ── Pre-load strategy ─────────────────────────────────────────────
# Check for incoming files before loading (none | imported | pending | ack)
pre_load_strategy: "imported"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "UTC"

# ── Retry policy ──────────────────────────────────────────────────
retries: "2"
retry_delay: "60"

# ── Tags ──────────────────────────────────────────────────────────
tags: "starlake load shell"

Transform

metadata/dags/dagster_shell_transform.sl.yml
dag:
comment: "Run transforms using Shell on Dagster"
template: "transform/dagster_scheduled_task_shell.py.j2"
filename: "dagster_{{domain}}_tasks.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "/opt/starlake",
"SL_DATASETS": "/opt/starlake/datasets",
"SL_ENV": "PROD"
}

# ── Shell execution ───────────────────────────────────────────────
SL_STARLAKE_PATH: "/usr/local/bin/starlake"

# ── Dependencies ──────────────────────────────────────────────────
# true = include all upstream loads/transforms in this job
# false = use Dagster Multi Asset Sensor for dependency tracking
run_dependencies_first: "true"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "UTC"

# ── Retry policy ──────────────────────────────────────────────────
retries: "2"
retry_delay: "60"

# ── Tags ──────────────────────────────────────────────────────────
tags: "starlake transform shell"

Transform with Multi Asset Sensor

metadata/dags/dagster_shell_transform_sensor.sl.yml
dag:
comment: "Run transforms using Shell with Multi Asset Sensor"
template: "transform/dagster_scheduled_task_shell.py.j2"
filename: "dagster_{{domain}}_tasks_sensor.py"
options:
sl_env_var: >-
{
"SL_ROOT": "/opt/starlake",
"SL_DATASETS": "/opt/starlake/datasets"
}
SL_STARLAKE_PATH: "/usr/local/bin/starlake"

# Dependencies handled by Dagster Multi Asset Sensor instead of inline
run_dependencies_first: "false"

retries: "2"
retry_delay: "60"
tags: "starlake transform shell sensor"

GCP Cloud Run

Use StarlakeDagsterCloudRunJob to execute starlake commands via Cloud Run jobs. Best for serverless GCP deployments.

Prerequisites

Before configuring Starlake to use Cloud Run, you must create the Cloud Run job in your GCP project. This is different from a Cloud Run service — jobs run to completion and exit, which is the execution model Starlake uses.

# Create the Cloud Run job with the starlake Docker image
gcloud run jobs create starlake-load \
--image=gcr.io/my-gcp-project/starlake:latest \
--region=europe-west1 \
--project=my-gcp-project \
--service-account=starlake-sa@my-gcp-project.iam.gserviceaccount.com \
--memory=2Gi \
--cpu=1 \
--task-timeout=3600s \
--max-retries=0

# Create a separate job for transforms (optional — you can reuse the same job)
gcloud run jobs create starlake-transform \
--image=gcr.io/my-gcp-project/starlake:latest \
--region=europe-west1 \
--project=my-gcp-project \
--service-account=starlake-sa@my-gcp-project.iam.gserviceaccount.com \
--memory=4Gi \
--cpu=2 \
--task-timeout=7200s \
--max-retries=0

The cloud_run_job_name option in your DAG configuration must match the job name created above (e.g. starlake-load, starlake-transform).

tip

Set --max-retries=0 on the Cloud Run job itself and let the orchestrator (Dagster) handle retries via the retries option. This avoids double retry loops.

Load

metadata/dags/dagster_cloud_run_load.sl.yml
dag:
comment: "Load tables using Cloud Run on Dagster"
template: "load/dagster_scheduled_table_cloud_run.py.j2"
filename: "dagster_{{domain}}_cloud_run_load.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "gs://my-bucket/starlake",
"SL_DATASETS": "gs://my-bucket/starlake/datasets",
"SL_ENV": "BIGQUERY"
}

# ── Cloud Run execution ───────────────────────────────────────────
# GCP project ID (defaults to $GCP_PROJECT env var)
cloud_run_project_id: "my-gcp-project"
# Name of the Cloud Run job to execute (REQUIRED)
cloud_run_job_name: "starlake-load"
# Region where the Cloud Run job is deployed
cloud_run_job_region: "europe-west1"

# ── Pre-load strategy ─────────────────────────────────────────────
pre_load_strategy: "imported"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "Europe/Paris"

# ── Retry policy ──────────────────────────────────────────────────
retries: "3"
retry_delay: "120"

# ── Tags ──────────────────────────────────────────────────────────
tags: "starlake load cloud_run"

Transform

metadata/dags/dagster_cloud_run_transform.sl.yml
dag:
comment: "Run transforms using Cloud Run on Dagster"
template: "transform/dagster_scheduled_task_cloud_run.py.j2"
filename: "dagster_{{domain}}_cloud_run_tasks.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "gs://my-bucket/starlake",
"SL_DATASETS": "gs://my-bucket/starlake/datasets",
"SL_ENV": "BIGQUERY"
}

# ── Cloud Run execution ───────────────────────────────────────────
cloud_run_project_id: "my-gcp-project"
cloud_run_job_name: "starlake-transform"
cloud_run_job_region: "europe-west1"

# ── Dependencies ──────────────────────────────────────────────────
run_dependencies_first: "true"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "Europe/Paris"

# ── Retry policy ──────────────────────────────────────────────────
retries: "3"
retry_delay: "120"

# ── Tags ──────────────────────────────────────────────────────────
tags: "starlake transform cloud_run"

GCP Dataproc

Use StarlakeDagsterDataprocJob to submit starlake commands as Spark jobs on a Dataproc cluster. Best for Spark-based workloads on GCP.

Load

metadata/dags/dagster_dataproc_load.sl.yml
dag:
comment: "Load tables using Dataproc on Dagster"
template: "load/dagster_scheduled_table_dataproc.py.j2"
filename: "dagster_{{domain}}_dataproc_load.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "gs://my-bucket/starlake",
"SL_ENV": "BIGQUERY"
}

# ── Dataproc cluster ──────────────────────────────────────────────
dataproc_project_id: "my-gcp-project"
dataproc_region: "europe-west1"
dataproc_subnet: "default"
dataproc_name: "starlake-cluster"
dataproc_image_version: "2.2-debian12"
dataproc_idle_delete_ttl: "3600"

# ── Dataproc master node ──────────────────────────────────────────
dataproc_master_machine_type: "n1-standard-4"
dataproc_master_disk_type: "pd-standard"
dataproc_master_disk_size: "1024"

# ── Dataproc worker nodes ─────────────────────────────────────────
dataproc_num_workers: "4"
dataproc_worker_machine_type: "n1-standard-4"
dataproc_worker_disk_type: "pd-standard"
dataproc_worker_disk_size: "1024"

# ── Spark configuration ───────────────────────────────────────────
spark_jar_list: "gs://my-bucket/jars/starlake-assembly.jar"
spark_job_main_class: "ai.starlake.job.Main"
spark_bucket: "my-spark-bucket"
spark_executor_memory: "8g"
spark_executor_cores: "4"
spark_executor_instances: "2"

# ── Pre-load strategy ─────────────────────────────────────────────
pre_load_strategy: "imported"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"

# ── Retry policy ──────────────────────────────────────────────────
retries: "2"
retry_delay: "300"

# ── Tags ──────────────────────────────────────────────────────────
tags: "starlake load dataproc"

Transform

metadata/dags/dagster_dataproc_transform.sl.yml
dag:
comment: "Run transforms using Dataproc on Dagster"
template: "transform/dagster_scheduled_task_dataproc.py.j2"
filename: "dagster_{{domain}}_dataproc_tasks.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "gs://my-bucket/starlake",
"SL_ENV": "BIGQUERY"
}

# ── Dataproc cluster ──────────────────────────────────────────────
dataproc_project_id: "my-gcp-project"
dataproc_region: "europe-west1"
dataproc_name: "starlake-cluster"
dataproc_idle_delete_ttl: "3600"

# ── Dataproc nodes ────────────────────────────────────────────────
dataproc_master_machine_type: "n1-standard-8"
dataproc_num_workers: "4"
dataproc_worker_machine_type: "n1-standard-4"

# ── Spark configuration ───────────────────────────────────────────
spark_jar_list: "gs://my-bucket/jars/starlake-assembly.jar"
spark_bucket: "my-spark-bucket"

# ── Dependencies ──────────────────────────────────────────────────
run_dependencies_first: "true"

# ── Retry policy ──────────────────────────────────────────────────
retries: "2"
retry_delay: "300"

# ── Tags ──────────────────────────────────────────────────────────
tags: "starlake transform dataproc"

AWS Fargate

Use StarlakeDagsterFargateJob to execute starlake commands as ECS tasks on AWS Fargate. Best for serverless AWS deployments.

Load

metadata/dags/dagster_fargate_load.sl.yml
dag:
comment: "Load tables using Fargate on Dagster"
template: "load/dagster_scheduled_table_fargate.py.j2"
filename: "dagster_{{domain}}_fargate_load.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "s3://my-bucket/starlake",
"SL_DATASETS": "s3://my-bucket/starlake/datasets",
"SL_ENV": "PROD"
}

# ── AWS Fargate execution ─────────────────────────────────────────
# AWS profile name
aws_profile: "default"
# AWS region
aws_region: "eu-west-1"
# ECS cluster name (REQUIRED)
aws_cluster_name: "starlake-ecs-cluster"
# ECS task definition name (REQUIRED)
aws_task_definition_name: "starlake-load-task"
# Container name in the task definition (REQUIRED)
aws_task_definition_container_name: "starlake"
# Private subnets for the task (REQUIRED, JSON array)
aws_task_private_subnets: '["subnet-0abc123def456789a", "subnet-0def456abc789012b"]'
# Security groups for the task (REQUIRED, JSON array)
aws_task_security_groups: '["sg-0abc123def456789a"]'
# CPU units for the container override (1024 = 1 vCPU)
cpu: "1024"
# Memory in MB for the container override
memory: "2048"

# ── Pre-load strategy ─────────────────────────────────────────────
pre_load_strategy: "imported"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "UTC"

# ── Retry policy ──────────────────────────────────────────────────
retries: "3"
retry_delay: "120"

# ── Tags ──────────────────────────────────────────────────────────
tags: "starlake load fargate"

Transform

metadata/dags/dagster_fargate_transform.sl.yml
dag:
comment: "Run transforms using Fargate on Dagster"
template: "transform/dagster_scheduled_task_fargate.py.j2"
filename: "dagster_{{domain}}_fargate_tasks.py"
options:
# ── Starlake environment ──────────────────────────────────────────
sl_env_var: >-
{
"SL_ROOT": "s3://my-bucket/starlake",
"SL_DATASETS": "s3://my-bucket/starlake/datasets",
"SL_ENV": "PROD"
}

# ── AWS Fargate execution ─────────────────────────────────────────
aws_profile: "default"
aws_region: "eu-west-1"
aws_cluster_name: "starlake-ecs-cluster"
aws_task_definition_name: "starlake-transform-task"
aws_task_definition_container_name: "starlake"
aws_task_private_subnets: '["subnet-0abc123def456789a", "subnet-0def456abc789012b"]'
aws_task_security_groups: '["sg-0abc123def456789a"]'
# More resources for transforms
cpu: "2048"
memory: "4096"

# ── Dependencies ──────────────────────────────────────────────────
run_dependencies_first: "true"

# ── Scheduling ────────────────────────────────────────────────────
start_date: "2024-01-01"
timezone: "UTC"

# ── Retry policy ──────────────────────────────────────────────────
retries: "3"
retry_delay: "120"

# ── Tags ──────────────────────────────────────────────────────────
tags: "starlake transform fargate"

Quick comparison

StrategyBest forCloud providerKey required options
ShellOn-premise, local devAnySL_STARLAKE_PATH
Cloud RunServerless GCPGCPcloud_run_job_name
DataprocSpark workloadsGCPspark_jar_list, spark_bucket, dataproc_project_id
FargateServerless AWSAWSaws_cluster_name, aws_task_definition_name, aws_task_definition_container_name, aws_task_private_subnets, aws_task_security_groups

Minimal configurations

If you want the simplest possible setup, here are minimal configurations that rely on defaults:

Minimal Shell

dag:
template: "load/dagster_scheduled_table_shell.py.j2"
filename: "dagster_all_load.py"

Minimal Cloud Run

dag:
template: "load/dagster_scheduled_table_cloud_run.py.j2"
filename: "dagster_all_load.py"
options:
cloud_run_job_name: "starlake-load"

Minimal Fargate

dag:
template: "load/dagster_scheduled_table_fargate.py.j2"
filename: "dagster_all_load.py"
options:
aws_cluster_name: "my-cluster"
aws_task_definition_name: "starlake-task"
aws_task_definition_container_name: "starlake"
aws_task_private_subnets: '["subnet-abc123"]'
aws_task_security_groups: '["sg-abc123"]'